From bdf84dc9a4841b36e6d408e399cfa688fff09aa5 Mon Sep 17 00:00:00 2001 From: Joshua Klein Date: Thu, 21 Mar 2024 08:17:54 -0400 Subject: [PATCH 1/8] Add more file-like methods to the python bindings --- bindings/python/src/file.rs | 109 +++++++++++++++++++++++++++++ bindings/python/tests/test_read.py | 9 +++ 2 files changed, 118 insertions(+) diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs index 3847eddb385c..d6b4012506d2 100644 --- a/bindings/python/src/file.rs +++ b/bindings/python/src/file.rs @@ -18,6 +18,7 @@ // Remove this `allow` after fixed. #![allow(clippy::unnecessary_fallible_conversions)] +use std::io::Read; use std::io::Seek; use std::io::SeekFrom; use std::io::Write; @@ -28,6 +29,7 @@ use futures::AsyncSeekExt; use futures::AsyncWriteExt; use pyo3::exceptions::PyIOError; use pyo3::exceptions::PyValueError; +use pyo3::buffer::PyBuffer; use pyo3::prelude::*; use pyo3_asyncio::tokio::future_into_py; use tokio::sync::Mutex; @@ -93,6 +95,41 @@ impl File { Buffer::new(buffer).into_bytes_ref(py) } + /// Read bytes into a pre-allocated, writable buffer + pub fn readinto(&mut self, buffer: PyBuffer) -> PyResult { + let reader = match &mut self.0 { + FileState::Reader(r) => r, + FileState::Writer(_) => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on write only file.", + )); + } + FileState::Closed => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on closed file.", + )); + } + }; + + if buffer.readonly() { + return Err(PyIOError::new_err("Buffer is not writable.")) + } + + if !buffer.is_c_contiguous() { + return Err(PyIOError::new_err("Buffer is not C contiguous.")) + } + + Python::with_gil(|_py| { + let ptr = buffer.buf_ptr(); + let nbytes = buffer.len_bytes(); + unsafe { + let view: &mut [u8] = std::slice::from_raw_parts_mut(ptr as *mut u8, nbytes); + let z = Read::read(reader, view)?; + Ok(z) + } + }) + } + /// Write bytes into the file. pub fn write(&mut self, bs: &[u8]) -> PyResult { let writer = match &mut self.0 { @@ -194,6 +231,45 @@ impl File { ) -> PyResult<()> { self.close() } + + /// Flush the underlying writer. Is a no-op if the file is opened in reading mode. + pub fn flush(&mut self) -> PyResult<()> { + if matches!(self.0, FileState::Reader(_)) { + Ok(()) + } else { + if let FileState::Writer(w) = &mut self.0 { + match w.flush() { + Ok(_) => Ok(()), + Err(e) => Err(e.into()), + } + } else { + Ok(()) + } + } + } + + /// Return True if the stream can be read from. + pub fn readable(&self) -> PyResult { + Ok(matches!(self.0, FileState::Reader(_))) + } + + /// Return True if the stream can be written to. + pub fn writable(&self) -> PyResult { + Ok(matches!(self.0, FileState::Writer(_))) + } + + /// Return True if the stream can be repositioned. + /// + /// In OpenDAL this is limited to only *readable* streams. + pub fn seekable(&self) -> PyResult { + self.readable() + } + + /// Return True if the stream is closed. + #[getter] + pub fn closed(&self) -> PyResult { + Ok(matches!(self.0, FileState::Closed)) + } } /// A file-like async reader. @@ -392,4 +468,37 @@ impl AsyncFile { ) -> PyResult<&'a PyAny> { self.close(py) } + + /// Check if the stream may be read from. + pub fn readable<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { + let state = self.0.clone(); + future_into_py(py, async move { + let state = state.lock().await; + Ok(matches!(*state, AsyncFileState::Reader(_))) + }) + } + + /// Check if the stream may be written to. + pub fn writable<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { + let state = self.0.clone(); + future_into_py(py, async move { + let state = state.lock().await; + Ok(matches!(*state, AsyncFileState::Writer(_))) + }) + } + + /// Check if the stream reader may be re-located. + pub fn seekable<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { + self.readable(py) + } + + /// Check if the stream is closed. + #[getter] + pub fn closed<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { + let state = self.0.clone(); + future_into_py(py, async move { + let state = state.lock().await; + Ok(matches!(*state, AsyncFileState::Closed)) + }) + } } diff --git a/bindings/python/tests/test_read.py b/bindings/python/tests/test_read.py index 9c219d1a64b5..94282c392bad 100644 --- a/bindings/python/tests/test_read.py +++ b/bindings/python/tests/test_read.py @@ -45,6 +45,10 @@ def test_sync_reader(service_name, operator, async_operator): operator.write(filename, content) with operator.open(filename, "rb") as reader: + assert reader.readable() + assert not reader.writable() + assert not reader.closed + read_content = reader.read() assert read_content is not None assert read_content == content @@ -54,6 +58,11 @@ def test_sync_reader(service_name, operator, async_operator): assert read_content is not None assert read_content == content + buf = bytearray(size = 1) + with operator.open(filename, 'rb') as reader: + reader.readinto(buf) + assert buf == content + operator.delete(filename) From c27f605d0465e871efad81babc85aa67652f69b2 Mon Sep 17 00:00:00 2001 From: Joshua Klein Date: Thu, 21 Mar 2024 22:16:06 -0400 Subject: [PATCH 2/8] Address code formatting lints --- bindings/python/src/file.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs index d6b4012506d2..e4ab1ae42f05 100644 --- a/bindings/python/src/file.rs +++ b/bindings/python/src/file.rs @@ -27,9 +27,9 @@ use std::sync::Arc; use futures::AsyncSeekExt; use futures::AsyncWriteExt; +use pyo3::buffer::PyBuffer; use pyo3::exceptions::PyIOError; use pyo3::exceptions::PyValueError; -use pyo3::buffer::PyBuffer; use pyo3::prelude::*; use pyo3_asyncio::tokio::future_into_py; use tokio::sync::Mutex; @@ -112,11 +112,11 @@ impl File { }; if buffer.readonly() { - return Err(PyIOError::new_err("Buffer is not writable.")) + return Err(PyIOError::new_err("Buffer is not writable.")); } if !buffer.is_c_contiguous() { - return Err(PyIOError::new_err("Buffer is not C contiguous.")) + return Err(PyIOError::new_err("Buffer is not C contiguous.")); } Python::with_gil(|_py| { From 03f4b960be22126898c6d23a1e6075008802ba0d Mon Sep 17 00:00:00 2001 From: Joshua Klein Date: Thu, 21 Mar 2024 22:36:41 -0400 Subject: [PATCH 3/8] Check service capabilities --- bindings/python/src/file.rs | 42 +++++++++++++++++++++++---------- bindings/python/src/operator.rs | 11 +++++---- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs index e4ab1ae42f05..5bcf4ba7b0b9 100644 --- a/bindings/python/src/file.rs +++ b/bindings/python/src/file.rs @@ -39,7 +39,7 @@ use crate::*; /// A file-like object. /// Can be used as a context manager. #[pyclass(module = "opendal")] -pub struct File(FileState); +pub struct File(FileState, Capability); enum FileState { Reader(ocore::BlockingReader), @@ -48,12 +48,12 @@ enum FileState { } impl File { - pub fn new_reader(reader: ocore::BlockingReader) -> Self { - Self(FileState::Reader(reader)) + pub fn new_reader(reader: ocore::BlockingReader, capability: Capability) -> Self { + Self(FileState::Reader(reader), capability) } - pub fn new_writer(writer: ocore::BlockingWriter) -> Self { - Self(FileState::Writer(writer)) + pub fn new_writer(writer: ocore::BlockingWriter, capability: Capability) -> Self { + Self(FileState::Writer(writer), capability) } } @@ -163,6 +163,11 @@ impl File { /// Return the new absolute position. #[pyo3(signature = (pos, whence = 0))] pub fn seek(&mut self, pos: i64, whence: u8) -> PyResult { + if !self.seekable()? { + return Err(PyIOError::new_err( + "Seek operation is not supported by the backing service.", + )); + } let reader = match &mut self.0 { FileState::Reader(r) => r, FileState::Writer(_) => { @@ -262,7 +267,14 @@ impl File { /// /// In OpenDAL this is limited to only *readable* streams. pub fn seekable(&self) -> PyResult { - self.readable() + match &self.0 { + FileState::Reader(_) => { + Ok(self.1.read_can_seek) + }, + _ => Ok(false) + } + + // self.readable() } /// Return True if the stream is closed. @@ -275,7 +287,7 @@ impl File { /// A file-like async reader. /// Can be used as an async context manager. #[pyclass(module = "opendal")] -pub struct AsyncFile(Arc>); +pub struct AsyncFile(Arc>, Capability); enum AsyncFileState { Reader(ocore::Reader), @@ -284,12 +296,12 @@ enum AsyncFileState { } impl AsyncFile { - pub fn new_reader(reader: ocore::Reader) -> Self { - Self(Arc::new(Mutex::new(AsyncFileState::Reader(reader)))) + pub fn new_reader(reader: ocore::Reader, capability: Capability) -> Self { + Self(Arc::new(Mutex::new(AsyncFileState::Reader(reader))), capability) } - pub fn new_writer(writer: ocore::Writer) -> Self { - Self(Arc::new(Mutex::new(AsyncFileState::Writer(writer)))) + pub fn new_writer(writer: ocore::Writer, capability: Capability) -> Self { + Self(Arc::new(Mutex::new(AsyncFileState::Writer(writer))), capability) } } @@ -489,7 +501,13 @@ impl AsyncFile { /// Check if the stream reader may be re-located. pub fn seekable<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { - self.readable(py) + if self.1.read_can_seek { + self.readable(py) + } else { + future_into_py(py, async move { + Ok(false) + }) + } } /// Check if the stream is closed. diff --git a/bindings/python/src/operator.rs b/bindings/python/src/operator.rs index e5194886e38a..92e44a711b1a 100644 --- a/bindings/python/src/operator.rs +++ b/bindings/python/src/operator.rs @@ -78,13 +78,13 @@ impl Operator { /// Open a file-like reader for the given path. pub fn open(&self, path: String, mode: String) -> PyResult { let this = self.0.clone(); - + let capability = self.capability()?; if mode == "rb" { let r = this.reader(&path).map_err(format_pyerr)?; - Ok(File::new_reader(r)) + Ok(File::new_reader(r, capability)) } else if mode == "wb" { let w = this.writer(&path).map_err(format_pyerr)?; - Ok(File::new_writer(w)) + Ok(File::new_writer(w, capability)) } else { Err(UnsupportedError::new_err(format!( "OpenDAL doesn't support mode: {mode}" @@ -240,14 +240,15 @@ impl AsyncOperator { /// Open a file-like reader for the given path. pub fn open<'p>(&'p self, py: Python<'p>, path: String, mode: String) -> PyResult<&'p PyAny> { let this = self.0.clone(); + let capability = self.capability()?; future_into_py(py, async move { if mode == "rb" { let r = this.reader(&path).await.map_err(format_pyerr)?; - Ok(AsyncFile::new_reader(r)) + Ok(AsyncFile::new_reader(r, capability)) } else if mode == "wb" { let w = this.writer(&path).await.map_err(format_pyerr)?; - Ok(AsyncFile::new_writer(w)) + Ok(AsyncFile::new_writer(w, capability)) } else { Err(UnsupportedError::new_err(format!( "OpenDAL doesn't support mode: {mode}" From 907f89a8d651a1228b5e26ff30701344952c32e2 Mon Sep 17 00:00:00 2001 From: Joshua Klein Date: Thu, 21 Mar 2024 22:40:01 -0400 Subject: [PATCH 4/8] Style --- bindings/python/src/file.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs index 5bcf4ba7b0b9..d3e41b3fed25 100644 --- a/bindings/python/src/file.rs +++ b/bindings/python/src/file.rs @@ -268,13 +268,9 @@ impl File { /// In OpenDAL this is limited to only *readable* streams. pub fn seekable(&self) -> PyResult { match &self.0 { - FileState::Reader(_) => { - Ok(self.1.read_can_seek) - }, + FileState::Reader(_) => Ok(self.1.read_can_seek), _ => Ok(false) } - - // self.readable() } /// Return True if the stream is closed. @@ -297,11 +293,17 @@ enum AsyncFileState { impl AsyncFile { pub fn new_reader(reader: ocore::Reader, capability: Capability) -> Self { - Self(Arc::new(Mutex::new(AsyncFileState::Reader(reader))), capability) + Self( + Arc::new(Mutex::new(AsyncFileState::Reader(reader))), + capability + ) } pub fn new_writer(writer: ocore::Writer, capability: Capability) -> Self { - Self(Arc::new(Mutex::new(AsyncFileState::Writer(writer))), capability) + Self( + Arc::new(Mutex::new(AsyncFileState::Writer(writer))), + capability + ) } } @@ -504,9 +506,7 @@ impl AsyncFile { if self.1.read_can_seek { self.readable(py) } else { - future_into_py(py, async move { - Ok(false) - }) + future_into_py(py, async move { Ok(false) }) } } From 26faa630e09c798a260bf515883d18d1fdb44ba2 Mon Sep 17 00:00:00 2001 From: Joshua Klein Date: Thu, 21 Mar 2024 22:43:43 -0400 Subject: [PATCH 5/8] More style --- bindings/python/src/file.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs index d3e41b3fed25..dc9a12f5cafd 100644 --- a/bindings/python/src/file.rs +++ b/bindings/python/src/file.rs @@ -269,7 +269,7 @@ impl File { pub fn seekable(&self) -> PyResult { match &self.0 { FileState::Reader(_) => Ok(self.1.read_can_seek), - _ => Ok(false) + _ => Ok(false), } } @@ -295,14 +295,14 @@ impl AsyncFile { pub fn new_reader(reader: ocore::Reader, capability: Capability) -> Self { Self( Arc::new(Mutex::new(AsyncFileState::Reader(reader))), - capability + capability, ) } pub fn new_writer(writer: ocore::Writer, capability: Capability) -> Self { Self( Arc::new(Mutex::new(AsyncFileState::Writer(writer))), - capability + capability, ) } } From 73b3e7e55881cd82f0063915327c9e6e4338a8e3 Mon Sep 17 00:00:00 2001 From: Joshua Klein Date: Fri, 12 Apr 2024 23:26:00 -0400 Subject: [PATCH 6/8] Fix up bindings --- bindings/python/src/file.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs index e4980c09a874..a4a360a7bad8 100644 --- a/bindings/python/src/file.rs +++ b/bindings/python/src/file.rs @@ -25,9 +25,9 @@ use std::io::Write; use std::ops::DerefMut; use std::sync::Arc; +use futures::{AsyncReadExt, AsyncSeekExt}; use pyo3::buffer::PyBuffer; -use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; -use pyo3::exceptions::{PyValueError, PyIOError}; +use pyo3::exceptions::{PyIOError, PyValueError}; use pyo3::prelude::*; use pyo3_asyncio::tokio::future_into_py; use tokio::sync::Mutex; @@ -47,7 +47,10 @@ enum FileState { impl File { pub fn new_reader(reader: ocore::BlockingReader, size: u64, capability: Capability) -> Self { - Self(FileState::Reader(reader.into_std_io_read(0..size)), capability) + Self( + FileState::Reader(reader.into_std_io_read(0..size)), + capability, + ) } pub fn new_writer(writer: ocore::BlockingWriter, capability: Capability) -> Self { @@ -268,7 +271,7 @@ impl File { /// In OpenDAL this is limited to only *readable* streams. pub fn seekable(&self) -> PyResult { match &self.0 { - FileState::Reader(_) => Ok(self.1.read_can_seek), + FileState::Reader(_) => Ok(true), _ => Ok(false), } } @@ -293,10 +296,12 @@ enum AsyncFileState { impl AsyncFile { pub fn new_reader(reader: ocore::Reader, size: u64, capability: Capability) -> Self { - Self(Arc::new(Mutex::new(AsyncFileState::Reader( - reader.into_futures_io_async_read(0..size), + Self( + Arc::new(Mutex::new(AsyncFileState::Reader( + reader.into_futures_io_async_read(0..size), + ))), capability, - )))) + ) } pub fn new_writer(writer: ocore::Writer, capability: Capability) -> Self { @@ -507,7 +512,7 @@ impl AsyncFile { /// Check if the stream reader may be re-located. pub fn seekable<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { - if self.1.read_can_seek { + if true { self.readable(py) } else { future_into_py(py, async move { Ok(false) }) From 7f3cf4d69397820e345dbca96922ab08c29e341b Mon Sep 17 00:00:00 2001 From: Joshua Klein Date: Sat, 13 Apr 2024 09:30:41 -0400 Subject: [PATCH 7/8] kwargs correction --- bindings/python/tests/test_read.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bindings/python/tests/test_read.py b/bindings/python/tests/test_read.py index 94282c392bad..69cb3d339df3 100644 --- a/bindings/python/tests/test_read.py +++ b/bindings/python/tests/test_read.py @@ -58,7 +58,7 @@ def test_sync_reader(service_name, operator, async_operator): assert read_content is not None assert read_content == content - buf = bytearray(size = 1) + buf = bytearray(1) with operator.open(filename, 'rb') as reader: reader.readinto(buf) assert buf == content From a8112f58c76c06ff52080529aa8a2c9d63e7dd41 Mon Sep 17 00:00:00 2001 From: Joshua Klein Date: Sat, 13 Apr 2024 09:45:45 -0400 Subject: [PATCH 8/8] Fix expectation when testing readinto --- bindings/python/tests/test_read.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bindings/python/tests/test_read.py b/bindings/python/tests/test_read.py index 69cb3d339df3..2c0b515ff164 100644 --- a/bindings/python/tests/test_read.py +++ b/bindings/python/tests/test_read.py @@ -61,7 +61,7 @@ def test_sync_reader(service_name, operator, async_operator): buf = bytearray(1) with operator.open(filename, 'rb') as reader: reader.readinto(buf) - assert buf == content + assert buf == content[:1] operator.delete(filename)