diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs index e407a8568e69..a4a360a7bad8 100644 --- a/bindings/python/src/file.rs +++ b/bindings/python/src/file.rs @@ -26,8 +26,8 @@ use std::ops::DerefMut; use std::sync::Arc; use futures::{AsyncReadExt, AsyncSeekExt}; -use pyo3::exceptions::PyIOError; -use pyo3::exceptions::PyValueError; +use pyo3::buffer::PyBuffer; +use pyo3::exceptions::{PyIOError, PyValueError}; use pyo3::prelude::*; use pyo3_asyncio::tokio::future_into_py; use tokio::sync::Mutex; @@ -37,7 +37,7 @@ use crate::*; /// A file-like object. /// Can be used as a context manager. #[pyclass(module = "opendal")] -pub struct File(FileState); +pub struct File(FileState, Capability); enum FileState { Reader(ocore::StdIoReader), @@ -46,12 +46,15 @@ enum FileState { } impl File { - pub fn new_reader(reader: ocore::BlockingReader, size: u64) -> Self { - Self(FileState::Reader(reader.into_std_io_read(0..size))) + pub fn new_reader(reader: ocore::BlockingReader, size: u64, capability: Capability) -> Self { + Self( + FileState::Reader(reader.into_std_io_read(0..size)), + capability, + ) } - pub fn new_writer(writer: ocore::BlockingWriter) -> Self { - Self(FileState::Writer(writer)) + pub fn new_writer(writer: ocore::BlockingWriter, capability: Capability) -> Self { + Self(FileState::Writer(writer), capability) } } @@ -95,6 +98,41 @@ impl File { Buffer::new(buffer).into_bytes_ref(py) } + /// Read bytes into a pre-allocated, writable buffer + pub fn readinto(&mut self, buffer: PyBuffer) -> PyResult { + let reader = match &mut self.0 { + FileState::Reader(r) => r, + FileState::Writer(_) => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on write only file.", + )); + } + FileState::Closed => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on closed file.", + )); + } + }; + + if buffer.readonly() { + return Err(PyIOError::new_err("Buffer is not writable.")); + } + + if !buffer.is_c_contiguous() { + return Err(PyIOError::new_err("Buffer is not C contiguous.")); + } + + Python::with_gil(|_py| { + let ptr = buffer.buf_ptr(); + let nbytes = buffer.len_bytes(); + unsafe { + let view: &mut [u8] = std::slice::from_raw_parts_mut(ptr as *mut u8, nbytes); + let z = Read::read(reader, view)?; + Ok(z) + } + }) + } + /// Write bytes into the file. pub fn write(&mut self, bs: &[u8]) -> PyResult { let writer = match &mut self.0 { @@ -128,6 +166,11 @@ impl File { /// Return the new absolute position. #[pyo3(signature = (pos, whence = 0))] pub fn seek(&mut self, pos: i64, whence: u8) -> PyResult { + if !self.seekable()? { + return Err(PyIOError::new_err( + "Seek operation is not supported by the backing service.", + )); + } let reader = match &mut self.0 { FileState::Reader(r) => r, FileState::Writer(_) => { @@ -196,12 +239,54 @@ impl File { ) -> PyResult<()> { self.close() } + + /// Flush the underlying writer. Is a no-op if the file is opened in reading mode. + pub fn flush(&mut self) -> PyResult<()> { + if matches!(self.0, FileState::Reader(_)) { + Ok(()) + } else { + if let FileState::Writer(w) = &mut self.0 { + match w.flush() { + Ok(_) => Ok(()), + Err(e) => Err(e.into()), + } + } else { + Ok(()) + } + } + } + + /// Return True if the stream can be read from. + pub fn readable(&self) -> PyResult { + Ok(matches!(self.0, FileState::Reader(_))) + } + + /// Return True if the stream can be written to. + pub fn writable(&self) -> PyResult { + Ok(matches!(self.0, FileState::Writer(_))) + } + + /// Return True if the stream can be repositioned. + /// + /// In OpenDAL this is limited to only *readable* streams. + pub fn seekable(&self) -> PyResult { + match &self.0 { + FileState::Reader(_) => Ok(true), + _ => Ok(false), + } + } + + /// Return True if the stream is closed. + #[getter] + pub fn closed(&self) -> PyResult { + Ok(matches!(self.0, FileState::Closed)) + } } /// A file-like async reader. /// Can be used as an async context manager. #[pyclass(module = "opendal")] -pub struct AsyncFile(Arc>); +pub struct AsyncFile(Arc>, Capability); enum AsyncFileState { Reader(ocore::FuturesIoAsyncReader), @@ -210,14 +295,20 @@ enum AsyncFileState { } impl AsyncFile { - pub fn new_reader(reader: ocore::Reader, size: u64) -> Self { - Self(Arc::new(Mutex::new(AsyncFileState::Reader( - reader.into_futures_io_async_read(0..size), - )))) + pub fn new_reader(reader: ocore::Reader, size: u64, capability: Capability) -> Self { + Self( + Arc::new(Mutex::new(AsyncFileState::Reader( + reader.into_futures_io_async_read(0..size), + ))), + capability, + ) } - pub fn new_writer(writer: ocore::Writer) -> Self { - Self(Arc::new(Mutex::new(AsyncFileState::Writer(writer)))) + pub fn new_writer(writer: ocore::Writer, capability: Capability) -> Self { + Self( + Arc::new(Mutex::new(AsyncFileState::Writer(writer))), + capability, + ) } } @@ -400,4 +491,41 @@ impl AsyncFile { ) -> PyResult<&'a PyAny> { self.close(py) } + + /// Check if the stream may be read from. + pub fn readable<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { + let state = self.0.clone(); + future_into_py(py, async move { + let state = state.lock().await; + Ok(matches!(*state, AsyncFileState::Reader(_))) + }) + } + + /// Check if the stream may be written to. + pub fn writable<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { + let state = self.0.clone(); + future_into_py(py, async move { + let state = state.lock().await; + Ok(matches!(*state, AsyncFileState::Writer(_))) + }) + } + + /// Check if the stream reader may be re-located. + pub fn seekable<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { + if true { + self.readable(py) + } else { + future_into_py(py, async move { Ok(false) }) + } + } + + /// Check if the stream is closed. + #[getter] + pub fn closed<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { + let state = self.0.clone(); + future_into_py(py, async move { + let state = state.lock().await; + Ok(matches!(*state, AsyncFileState::Closed)) + }) + } } diff --git a/bindings/python/src/operator.rs b/bindings/python/src/operator.rs index 86d03fce21f9..f183fe61428c 100644 --- a/bindings/python/src/operator.rs +++ b/bindings/python/src/operator.rs @@ -78,14 +78,14 @@ impl Operator { /// Open a file-like reader for the given path. pub fn open(&self, path: String, mode: String) -> PyResult { let this = self.0.clone(); - + let capability = self.capability()?; if mode == "rb" { let meta = this.stat(&path).map_err(format_pyerr)?; let r = this.reader(&path).map_err(format_pyerr)?; - Ok(File::new_reader(r, meta.content_length())) + Ok(File::new_reader(r, meta.content_length(), capability)) } else if mode == "wb" { let w = this.writer(&path).map_err(format_pyerr)?; - Ok(File::new_writer(w)) + Ok(File::new_writer(w, capability)) } else { Err(UnsupportedError::new_err(format!( "OpenDAL doesn't support mode: {mode}" @@ -241,15 +241,16 @@ impl AsyncOperator { /// Open a file-like reader for the given path. pub fn open<'p>(&'p self, py: Python<'p>, path: String, mode: String) -> PyResult<&'p PyAny> { let this = self.0.clone(); + let capability = self.capability()?; future_into_py(py, async move { if mode == "rb" { let meta = this.stat(&path).await.map_err(format_pyerr)?; let r = this.reader(&path).await.map_err(format_pyerr)?; - Ok(AsyncFile::new_reader(r, meta.content_length())) + Ok(AsyncFile::new_reader(r, meta.content_length(), capability)) } else if mode == "wb" { let w = this.writer(&path).await.map_err(format_pyerr)?; - Ok(AsyncFile::new_writer(w)) + Ok(AsyncFile::new_writer(w, capability)) } else { Err(UnsupportedError::new_err(format!( "OpenDAL doesn't support mode: {mode}" diff --git a/bindings/python/tests/test_read.py b/bindings/python/tests/test_read.py index 9c219d1a64b5..2c0b515ff164 100644 --- a/bindings/python/tests/test_read.py +++ b/bindings/python/tests/test_read.py @@ -45,6 +45,10 @@ def test_sync_reader(service_name, operator, async_operator): operator.write(filename, content) with operator.open(filename, "rb") as reader: + assert reader.readable() + assert not reader.writable() + assert not reader.closed + read_content = reader.read() assert read_content is not None assert read_content == content @@ -54,6 +58,11 @@ def test_sync_reader(service_name, operator, async_operator): assert read_content is not None assert read_content == content + buf = bytearray(1) + with operator.open(filename, 'rb') as reader: + reader.readinto(buf) + assert buf == content[:1] + operator.delete(filename)