Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more file-like methods to the python bindings #4384

Merged
156 changes: 142 additions & 14 deletions bindings/python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use std::ops::DerefMut;
use std::sync::Arc;

use futures::{AsyncReadExt, AsyncSeekExt};
use pyo3::exceptions::PyIOError;
use pyo3::exceptions::PyValueError;
use pyo3::buffer::PyBuffer;
use pyo3::exceptions::{PyIOError, PyValueError};
use pyo3::prelude::*;
use pyo3_asyncio::tokio::future_into_py;
use tokio::sync::Mutex;
Expand All @@ -37,7 +37,7 @@ use crate::*;
/// A file-like object.
/// Can be used as a context manager.
#[pyclass(module = "opendal")]
pub struct File(FileState);
pub struct File(FileState, Capability);

enum FileState {
Reader(ocore::StdIoReader),
Expand All @@ -46,12 +46,15 @@ enum FileState {
}

impl File {
pub fn new_reader(reader: ocore::BlockingReader, size: u64) -> Self {
Self(FileState::Reader(reader.into_std_io_read(0..size)))
pub fn new_reader(reader: ocore::BlockingReader, size: u64, capability: Capability) -> Self {
Self(
FileState::Reader(reader.into_std_io_read(0..size)),
capability,
)
}

pub fn new_writer(writer: ocore::BlockingWriter) -> Self {
Self(FileState::Writer(writer))
pub fn new_writer(writer: ocore::BlockingWriter, capability: Capability) -> Self {
Self(FileState::Writer(writer), capability)
}
}

Expand Down Expand Up @@ -95,6 +98,41 @@ impl File {
Buffer::new(buffer).into_bytes_ref(py)
}

/// Read bytes into a pre-allocated, writable buffer
pub fn readinto(&mut self, buffer: PyBuffer<u8>) -> PyResult<usize> {
let reader = match &mut self.0 {
FileState::Reader(r) => r,
FileState::Writer(_) => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on write only file.",
));
}
FileState::Closed => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on closed file.",
));
}
};

if buffer.readonly() {
return Err(PyIOError::new_err("Buffer is not writable."));
}

if !buffer.is_c_contiguous() {
return Err(PyIOError::new_err("Buffer is not C contiguous."));
}

Python::with_gil(|_py| {
let ptr = buffer.buf_ptr();
let nbytes = buffer.len_bytes();
unsafe {
let view: &mut [u8] = std::slice::from_raw_parts_mut(ptr as *mut u8, nbytes);
let z = Read::read(reader, view)?;
Ok(z)
}
})
}

/// Write bytes into the file.
pub fn write(&mut self, bs: &[u8]) -> PyResult<usize> {
let writer = match &mut self.0 {
Expand Down Expand Up @@ -128,6 +166,11 @@ impl File {
/// Return the new absolute position.
#[pyo3(signature = (pos, whence = 0))]
pub fn seek(&mut self, pos: i64, whence: u8) -> PyResult<u64> {
if !self.seekable()? {
return Err(PyIOError::new_err(
"Seek operation is not supported by the backing service.",
));
}
let reader = match &mut self.0 {
FileState::Reader(r) => r,
FileState::Writer(_) => {
Expand Down Expand Up @@ -196,12 +239,54 @@ impl File {
) -> PyResult<()> {
self.close()
}

/// Flush the underlying writer. Is a no-op if the file is opened in reading mode.
pub fn flush(&mut self) -> PyResult<()> {
if matches!(self.0, FileState::Reader(_)) {
Ok(())
} else {
if let FileState::Writer(w) = &mut self.0 {
match w.flush() {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
} else {
Ok(())
}
}
}

/// Return True if the stream can be read from.
pub fn readable(&self) -> PyResult<bool> {
Ok(matches!(self.0, FileState::Reader(_)))
}

/// Return True if the stream can be written to.
pub fn writable(&self) -> PyResult<bool> {
Ok(matches!(self.0, FileState::Writer(_)))
}

/// Return True if the stream can be repositioned.
///
/// In OpenDAL this is limited to only *readable* streams.
pub fn seekable(&self) -> PyResult<bool> {
match &self.0 {
FileState::Reader(_) => Ok(true),
_ => Ok(false),
}
}

/// Return True if the stream is closed.
#[getter]
pub fn closed(&self) -> PyResult<bool> {
Ok(matches!(self.0, FileState::Closed))
}
}

/// A file-like async reader.
/// Can be used as an async context manager.
#[pyclass(module = "opendal")]
pub struct AsyncFile(Arc<Mutex<AsyncFileState>>);
pub struct AsyncFile(Arc<Mutex<AsyncFileState>>, Capability);

enum AsyncFileState {
Reader(ocore::FuturesIoAsyncReader),
Expand All @@ -210,14 +295,20 @@ enum AsyncFileState {
}

impl AsyncFile {
pub fn new_reader(reader: ocore::Reader, size: u64) -> Self {
Self(Arc::new(Mutex::new(AsyncFileState::Reader(
reader.into_futures_io_async_read(0..size),
))))
pub fn new_reader(reader: ocore::Reader, size: u64, capability: Capability) -> Self {
Self(
Arc::new(Mutex::new(AsyncFileState::Reader(
reader.into_futures_io_async_read(0..size),
))),
capability,
)
}

pub fn new_writer(writer: ocore::Writer) -> Self {
Self(Arc::new(Mutex::new(AsyncFileState::Writer(writer))))
pub fn new_writer(writer: ocore::Writer, capability: Capability) -> Self {
Self(
Arc::new(Mutex::new(AsyncFileState::Writer(writer))),
capability,
)
}
}

Expand Down Expand Up @@ -400,4 +491,41 @@ impl AsyncFile {
) -> PyResult<&'a PyAny> {
self.close(py)
}

/// Check if the stream may be read from.
pub fn readable<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> {
let state = self.0.clone();
future_into_py(py, async move {
let state = state.lock().await;
Ok(matches!(*state, AsyncFileState::Reader(_)))
})
}

/// Check if the stream may be written to.
pub fn writable<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> {
let state = self.0.clone();
future_into_py(py, async move {
let state = state.lock().await;
Ok(matches!(*state, AsyncFileState::Writer(_)))
})
}

/// Check if the stream reader may be re-located.
pub fn seekable<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> {
if true {
self.readable(py)
} else {
future_into_py(py, async move { Ok(false) })
}
}

/// Check if the stream is closed.
#[getter]
pub fn closed<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> {
let state = self.0.clone();
future_into_py(py, async move {
let state = state.lock().await;
Ok(matches!(*state, AsyncFileState::Closed))
})
}
}
11 changes: 6 additions & 5 deletions bindings/python/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ impl Operator {
/// Open a file-like reader for the given path.
pub fn open(&self, path: String, mode: String) -> PyResult<File> {
let this = self.0.clone();

let capability = self.capability()?;
if mode == "rb" {
let meta = this.stat(&path).map_err(format_pyerr)?;
let r = this.reader(&path).map_err(format_pyerr)?;
Ok(File::new_reader(r, meta.content_length()))
Ok(File::new_reader(r, meta.content_length(), capability))
} else if mode == "wb" {
let w = this.writer(&path).map_err(format_pyerr)?;
Ok(File::new_writer(w))
Ok(File::new_writer(w, capability))
} else {
Err(UnsupportedError::new_err(format!(
"OpenDAL doesn't support mode: {mode}"
Expand Down Expand Up @@ -241,15 +241,16 @@ impl AsyncOperator {
/// Open a file-like reader for the given path.
pub fn open<'p>(&'p self, py: Python<'p>, path: String, mode: String) -> PyResult<&'p PyAny> {
let this = self.0.clone();
let capability = self.capability()?;

future_into_py(py, async move {
if mode == "rb" {
let meta = this.stat(&path).await.map_err(format_pyerr)?;
let r = this.reader(&path).await.map_err(format_pyerr)?;
Ok(AsyncFile::new_reader(r, meta.content_length()))
Ok(AsyncFile::new_reader(r, meta.content_length(), capability))
} else if mode == "wb" {
let w = this.writer(&path).await.map_err(format_pyerr)?;
Ok(AsyncFile::new_writer(w))
Ok(AsyncFile::new_writer(w, capability))
} else {
Err(UnsupportedError::new_err(format!(
"OpenDAL doesn't support mode: {mode}"
Expand Down
9 changes: 9 additions & 0 deletions bindings/python/tests/test_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ def test_sync_reader(service_name, operator, async_operator):
operator.write(filename, content)

with operator.open(filename, "rb") as reader:
assert reader.readable()
assert not reader.writable()
assert not reader.closed

read_content = reader.read()
assert read_content is not None
assert read_content == content
Expand All @@ -54,6 +58,11 @@ def test_sync_reader(service_name, operator, async_operator):
assert read_content is not None
assert read_content == content

buf = bytearray(1)
with operator.open(filename, 'rb') as reader:
reader.readinto(buf)
assert buf == content[:1]

operator.delete(filename)


Expand Down
Loading