diff --git a/bindings/ruby/lib/opendal.rb b/bindings/ruby/lib/opendal.rb index 28bd98139472..5b1995ea39ed 100644 --- a/bindings/ruby/lib/opendal.rb +++ b/bindings/ruby/lib/opendal.rb @@ -19,3 +19,4 @@ require_relative "opendal_ruby/version" require_relative "opendal_ruby/opendal_ruby" +require_relative "opendal_ruby/io" diff --git a/bindings/ruby/lib/opendal_ruby/io.rb b/bindings/ruby/lib/opendal_ruby/io.rb new file mode 100644 index 000000000000..328433135c1b --- /dev/null +++ b/bindings/ruby/lib/opendal_ruby/io.rb @@ -0,0 +1,49 @@ +# frozen_string_literal: true + +module OpenDAL + class IO + # Reads all lines from the stream into an array. + # Raises `EOFError` when the end of the file is reached. + def readlines + results = [] + + loop do + results << readline + rescue EOFError + break + end + + results + end + + # Rewinds the stream to the beginning. + def rewind + seek(0, ::IO::SEEK_SET) + end + + # Sets the file position to `new_position`. + def pos=(new_position) + seek(new_position, ::IO::SEEK_SET) + end + + alias_method :pos, :tell + + # Checks if the stream is at the end of the file. + def eof + position = tell + seek(0, ::IO::SEEK_END) + tell == position + end + + alias_method :eof?, :eof + + # Returns the total length of the stream. + def length + current_position = tell + seek(0, ::IO::SEEK_END) + tell.tap { self.pos = current_position } + end + + alias_method :size, :length + end +end diff --git a/bindings/ruby/lib/opendal_ruby/version.rb b/bindings/ruby/lib/opendal_ruby/version.rb index e5ff6578732d..ca9111d77146 100644 --- a/bindings/ruby/lib/opendal_ruby/version.rb +++ b/bindings/ruby/lib/opendal_ruby/version.rb @@ -18,5 +18,5 @@ # frozen_string_literal: true module OpenDAL - VERSION = "0.1.9" + VERSION = "0.1.10" end diff --git a/bindings/ruby/src/lib.rs b/bindings/ruby/src/lib.rs index 5de42706927a..17826edf188e 100644 --- a/bindings/ruby/src/lib.rs +++ b/bindings/ruby/src/lib.rs @@ -26,6 +26,7 @@ pub use ::opendal as ocore; mod capability; mod metadata; +mod opendal_io; mod operator; pub fn format_magnus_error(err: ocore::Error) -> Error { @@ -39,6 +40,7 @@ fn init(ruby: &Ruby) -> Result<(), Error> { let _ = operator::include(&gem_module); let _ = metadata::include(&gem_module); let _ = capability::include(&gem_module); + let _ = opendal_io::include(&gem_module); Ok(()) } diff --git a/bindings/ruby/src/opendal_io.rs b/bindings/ruby/src/opendal_io.rs new file mode 100644 index 000000000000..afa835d2ff1c --- /dev/null +++ b/bindings/ruby/src/opendal_io.rs @@ -0,0 +1,332 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cell::RefCell; +use std::collections::HashSet; +use std::io::BufRead; +use std::io::Read; +use std::io::Seek; +use std::io::SeekFrom; +use std::io::Write; + +use magnus::class; +use magnus::method; +use magnus::prelude::*; +use magnus::Error; +use magnus::RModule; + +use crate::*; + +// `OpenDALIO` follows similar Ruby IO classes, such as: +// - IO +// - StringIO +// +// `OpenDALIO` is not exactly an `IO` but is unidirectional (either `Reader` or `Writer`). +// TODO: implement encoding. +// +// The name of OpenDALIO is an arbitrary choice. Open to changes before 1.0. +#[magnus::wrap(class = "OpenDAL::IO", free_immediately, size)] +pub struct OpenDALIO(RefCell); + +enum FileState { + Reader(ocore::StdReader, bool), // bool indicates binary mode + Writer(ocore::StdWriter, bool), // bool indicates binary mode + Closed, +} + +pub fn format_io_error(err: std::io::Error) -> Error { + Error::new(exception::runtime_error(), err.to_string()) +} + +impl OpenDALIO { + /// Creates a new `OpenDAL::IO` object in Ruby. + /// + // @param ruby Ruby handle, required for exception handling. + // @param operator OpenDAL operator for file operations. + // @param path Path to the file. + // @param mode Mode string, e.g., "r", "w", or "rb". + // + // The mode must contain unique characters. Invalid or duplicate modes will raise an `ArgumentError`. + pub fn new( + ruby: &Ruby, + operator: ocore::BlockingOperator, + path: String, + mode: String, + ) -> Result { + let mut mode_flags = HashSet::new(); + let is_unique = mode.chars().all(|c| mode_flags.insert(c)); + if !is_unique { + return Err(Error::new( + ruby.exception_arg_error(), + format!("Invalid access mode {mode}"), + )); + } + + let binary_mode = mode_flags.contains(&'b'); + + if mode_flags.contains(&'r') { + Ok(Self(RefCell::new(FileState::Reader( + operator + .reader(&path) + .map_err(format_magnus_error)? + .into_std_read(..) + .map_err(format_magnus_error)?, + binary_mode, + )))) + } else if mode_flags.contains(&'w') { + Ok(Self(RefCell::new(FileState::Writer( + operator + .writer(&path) + .map_err(format_magnus_error)? + .into_std_write(), + binary_mode, + )))) + } else { + Err(Error::new( + ruby.exception_runtime_error(), + format!("OpenDAL doesn't support mode: {mode}"), + )) + } + } + + /// Enables binary mode for the stream. + fn binary_mode(ruby: &Ruby, rb_self: &Self) -> Result<(), Error> { + let mut cell = rb_self.0.borrow_mut(); + match &mut *cell { + FileState::Reader(_, ref mut is_binary_mode) => { + *is_binary_mode = true; + Ok(()) + } + FileState::Writer(_, ref mut is_binary_mode) => { + *is_binary_mode = true; + Ok(()) + } + FileState::Closed => Err(Error::new(ruby.exception_io_error(), "closed stream")), + } + } + + /// Returns if the stream is on binary mode. + fn is_binary_mode(ruby: &Ruby, rb_self: &Self) -> Result { + match *rb_self.0.borrow() { + FileState::Reader(_, is_binary_mode) => Ok(is_binary_mode), + FileState::Writer(_, is_binary_mode) => Ok(is_binary_mode), + FileState::Closed => Err(Error::new(ruby.exception_io_error(), "closed stream")), + } + } + + /// Checks if the stream is in binary mode. + fn close(&self) -> Result<(), Error> { + // skips closing reader because `StdReader` doesn't have `close()`. + let mut cell = self.0.borrow_mut(); + if let FileState::Writer(writer, _) = &mut *cell { + writer.close().map_err(format_io_error)?; + } + *cell = FileState::Closed; + Ok(()) + } + + /// Closes the stream and transitions the state to `Closed`. + fn close_read(&self) -> Result<(), Error> { + *self.0.borrow_mut() = FileState::Closed; + Ok(()) + } + + /// Reads data from the stream. + fn close_write(&self) -> Result<(), Error> { + let mut cell = self.0.borrow_mut(); + if let FileState::Writer(writer, _) = &mut *cell { + writer.close().map_err(format_io_error)?; + } + *cell = FileState::Closed; + Ok(()) + } + + fn is_closed(&self) -> Result { + Ok(matches!(*self.0.borrow(), FileState::Closed)) + } + + fn is_closed_read(&self) -> Result { + Ok(!matches!(*self.0.borrow(), FileState::Reader(_, _))) + } + + fn is_closed_write(&self) -> Result { + Ok(!matches!(*self.0.borrow(), FileState::Writer(_, _))) + } +} + +impl OpenDALIO { + /// Reads data from the stream. + /// TODO: + /// - support default parameters + /// - support encoding + /// + /// @param size The maximum number of bytes to read. Reads all data if `None`. + fn read(ruby: &Ruby, rb_self: &Self, size: Option) -> Result { + // FIXME: consider what to return exactly + if let FileState::Reader(reader, _) = &mut *rb_self.0.borrow_mut() { + let buffer = match size { + Some(size) => { + let mut bs = vec![0; size]; + let n = reader.read(&mut bs).map_err(format_io_error)?; + bs.truncate(n); + bs + } + None => { + let mut buffer = Vec::new(); + reader.read_to_end(&mut buffer).map_err(format_io_error)?; + buffer + } + }; + + Ok(buffer.into()) + } else { + Err(Error::new( + ruby.exception_runtime_error(), + "I/O operation failed for reading on closed file.", + )) + } + } + + /// Reads a single line from the stream. + // TODO: extend readline with parameters + fn readline(ruby: &Ruby, rb_self: &Self) -> Result { + if let FileState::Reader(reader, _) = &mut *rb_self.0.borrow_mut() { + let mut buffer = String::new(); + let size = reader.read_line(&mut buffer).map_err(format_io_error)?; + if size == 0 { + return Err(Error::new( + ruby.exception_eof_error(), + "end of file reached", + )); + } + + Ok(buffer) + } else { + Err(Error::new( + ruby.exception_runtime_error(), + "I/O operation failed for reading on closed file.", + )) + } + } + + /// Writes data to the stream. + /// + /// @param bs The string data to write to the stream. + fn write(ruby: &Ruby, rb_self: &Self, bs: String) -> Result { + if let FileState::Writer(writer, _) = &mut *rb_self.0.borrow_mut() { + Ok(writer + .write_all(bs.as_bytes()) + .map(|_| bs.len()) + .map_err(format_io_error)?) + } else { + Err(Error::new( + ruby.exception_runtime_error(), + "I/O operation failed for reading on write only file.", + )) + } + } +} + +impl OpenDALIO { + /// Moves the file position based on the offset and whence. + /// + /// @param offset The position offset. + /// @param whence The reference point: + /// - 0 = IO:SEEK_SET (Start) + /// - 1 = IO:SEEK_CUR (Current position) + /// - 2 = IO:SEEK_END (From the end) + fn seek(ruby: &Ruby, rb_self: &Self, offset: i64, whence: u8) -> Result { + match &mut *rb_self.0.borrow_mut() { + FileState::Reader(reader, _) => { + let whence = match whence { + 0 => SeekFrom::Start(offset as u64), + 1 => SeekFrom::Current(offset), + 2 => SeekFrom::End(offset), + _ => return Err(Error::new(ruby.exception_arg_error(), "invalid whence")), + }; + + reader.seek(whence).map_err(format_io_error)?; + + Ok(0) + } + FileState::Writer(_, _) => Err(Error::new( + ruby.exception_runtime_error(), + "I/O operation failed for reading on write only file.", + )), + FileState::Closed => Err(Error::new( + ruby.exception_runtime_error(), + "I/O operation failed for reading on closed file.", + )), + } + } + + /// Returns the current position of the file pointer in the stream. + fn tell(ruby: &Ruby, rb_self: &Self) -> Result { + match &mut *rb_self.0.borrow_mut() { + FileState::Reader(reader, _) => { + Ok(reader.stream_position().map_err(format_io_error)?) + } + FileState::Writer(_, _) => Err(Error::new( + ruby.exception_runtime_error(), + "I/O operation failed for reading on write only file.", + )), + FileState::Closed => Err(Error::new( + ruby.exception_runtime_error(), + "I/O operation failed for reading on closed file.", + )), + } + } + + // TODO: consider implement: + // - lineno + // - set_lineno + // - getc + // - putc + // - gets + // - puts +} + +/// Defines the `OpenDAL::IO` class in the given Ruby module and binds its methods. +/// +/// This function uses Magnus's built-in Ruby thread-safety features to define the +/// `OpenDAL::IO` class and its methods in the provided Ruby module (`gem_module`). +/// +/// # Ruby Object Lifetime and Safety +/// +/// Ruby objects can only exist in the Ruby heap and are tracked by Ruby's garbage collector (GC). +/// While we can allocate and store Ruby-related objects in the Rust heap, Magnus does not +/// automatically track such objects. Therefore, it is critical to work within Magnus's safety +/// guidelines when integrating Rust objects with Ruby. Read more in the Magnus documentation: +/// [Magnus Safety Documentation](https://github.com/matsadler/magnus#safety). +pub fn include(gem_module: &RModule) -> Result<(), Error> { + let class = gem_module.define_class("IO", class::object())?; + class.define_method("binmode", method!(OpenDALIO::binary_mode, 0))?; + class.define_method("binmode?", method!(OpenDALIO::is_binary_mode, 0))?; + class.define_method("close", method!(OpenDALIO::close, 0))?; + class.define_method("close_read", method!(OpenDALIO::close_read, 0))?; + class.define_method("close_write", method!(OpenDALIO::close_write, 0))?; + class.define_method("closed?", method!(OpenDALIO::is_closed, 0))?; + class.define_method("closed_read?", method!(OpenDALIO::is_closed_read, 0))?; + class.define_method("closed_write?", method!(OpenDALIO::is_closed_write, 0))?; + class.define_method("read", method!(OpenDALIO::read, 1))?; + class.define_method("write", method!(OpenDALIO::write, 1))?; + class.define_method("readline", method!(OpenDALIO::readline, 0))?; + class.define_method("seek", method!(OpenDALIO::seek, 2))?; + class.define_method("tell", method!(OpenDALIO::tell, 0))?; + + Ok(()) +} diff --git a/bindings/ruby/src/operator.rs b/bindings/ruby/src/operator.rs index 997b55a607b6..f2e564ceaf51 100644 --- a/bindings/ruby/src/operator.rs +++ b/bindings/ruby/src/operator.rs @@ -24,9 +24,11 @@ use magnus::prelude::*; use magnus::Error; use magnus::RModule; use magnus::RString; +use magnus::Ruby; use crate::capability::Capability; use crate::metadata::Metadata; +use crate::opendal_io::OpenDALIO; use crate::*; #[magnus::wrap(class = "OpenDAL::Operator", free_immediately, size)] @@ -34,39 +36,48 @@ use crate::*; struct Operator(ocore::BlockingOperator); impl Operator { - fn new(scheme: String, options: Option>) -> Result { + fn new( + ruby: &Ruby, + scheme: String, + options: Option>, + ) -> Result { let scheme = ocore::Scheme::from_str(&scheme) .map_err(|err| { ocore::Error::new(ocore::ErrorKind::Unexpected, "unsupported scheme") .set_source(err) }) - .map_err(format_magnus_error)?; + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string()))?; let options = options.unwrap_or_default(); let op = ocore::Operator::via_iter(scheme, options) - .map_err(format_magnus_error)? + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string()))? .blocking(); Ok(Operator(op)) } /// Reads the whole path into string. - fn read(&self, path: String) -> Result { - let buffer = self.0.read(&path).map_err(format_magnus_error)?; + fn read(ruby: &Ruby, rb_self: &Self, path: String) -> Result { + let buffer = rb_self + .0 + .read(&path) + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string()))?; Ok(buffer.to_bytes()) } /// Writes string into given path. - fn write(&self, path: String, bs: RString) -> Result<(), Error> { - self.0 + fn write(ruby: &Ruby, rb_self: &Self, path: String, bs: RString) -> Result<(), Error> { + rb_self + .0 .write(&path, bs.to_bytes()) - .map_err(format_magnus_error) + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string())) } /// Gets current path's metadata **without cache** directly. - fn stat(&self, path: String) -> Result { - self.0 + fn stat(ruby: &Ruby, rb_self: &Self, path: String) -> Result { + rb_self + .0 .stat(&path) - .map_err(format_magnus_error) + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string())) .map(Metadata::new) } @@ -78,33 +89,57 @@ impl Operator { /// Creates directory recursively similar as `mkdir -p` /// The ending path must be `/`. Otherwise, OpenDAL throws `NotADirectory` error. - fn create_dir(&self, path: String) -> Result<(), Error> { - self.0.create_dir(&path).map_err(format_magnus_error) + fn create_dir(ruby: &Ruby, rb_self: &Self, path: String) -> Result<(), Error> { + rb_self + .0 + .create_dir(&path) + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string())) } /// Deletes given path - fn delete(&self, path: String) -> Result<(), Error> { - self.0.delete(&path).map_err(format_magnus_error) + fn delete(ruby: &Ruby, rb_self: &Self, path: String) -> Result<(), Error> { + rb_self + .0 + .delete(&path) + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string())) } /// Returns if this path exists - fn exists(&self, path: String) -> Result { - self.0.exists(&path).map_err(format_magnus_error) + fn exists(ruby: &Ruby, rb_self: &Self, path: String) -> Result { + rb_self + .0 + .exists(&path) + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string())) } /// Renames a file from `from` to `to` - fn rename(&self, from: String, to: String) -> Result<(), Error> { - self.0.rename(&from, &to).map_err(format_magnus_error) + fn rename(ruby: &Ruby, rb_self: &Self, from: String, to: String) -> Result<(), Error> { + rb_self + .0 + .rename(&from, &to) + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string())) } /// Removes the path and all nested directories and files recursively - fn remove_all(&self, path: String) -> Result<(), Error> { - self.0.remove_all(&path).map_err(format_magnus_error) + fn remove_all(ruby: &Ruby, rb_self: &Self, path: String) -> Result<(), Error> { + rb_self + .0 + .remove_all(&path) + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string())) } /// Copies a file from `from` to `to`. - fn copy(&self, from: String, to: String) -> Result<(), Error> { - self.0.copy(&from, &to).map_err(format_magnus_error) + fn copy(ruby: &Ruby, rb_self: &Self, from: String, to: String) -> Result<(), Error> { + rb_self + .0 + .copy(&from, &to) + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string())) + } + + /// Opens a IO-like reader for the given path. + fn open(ruby: &Ruby, rb_self: &Self, path: String, mode: String) -> Result { + let operator = rb_self.0.clone(); + Ok(OpenDALIO::new(&ruby, operator, path, mode)?) } } @@ -121,6 +156,7 @@ pub fn include(gem_module: &RModule) -> Result<(), Error> { class.define_method("rename", method!(Operator::rename, 2))?; class.define_method("remove_all", method!(Operator::remove_all, 1))?; class.define_method("copy", method!(Operator::copy, 2))?; + class.define_method("open", method!(Operator::open, 2))?; Ok(()) } diff --git a/bindings/ruby/test/blocking_op_test.rb b/bindings/ruby/test/blocking_op_test.rb index b6cee4d716c4..81df513288ec 100644 --- a/bindings/ruby/test/blocking_op_test.rb +++ b/bindings/ruby/test/blocking_op_test.rb @@ -95,4 +95,12 @@ class OpenDalTest < ActiveSupport::TestCase assert File.exist?("#{@root}/sample") assert File.exist?("#{@root}/new_name") end + + test "opens an OpenDALIO" do + io = @op.open("/sample", "rb") + + assert_not io.closed? + + io.close + end end diff --git a/bindings/ruby/test/io_test.rb b/bindings/ruby/test/io_test.rb new file mode 100644 index 000000000000..2b6c0ccb03e1 --- /dev/null +++ b/bindings/ruby/test/io_test.rb @@ -0,0 +1,144 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# frozen_string_literal: true + +require "test_helper" +require "tmpdir" + +class OpenDALIOTest < ActiveSupport::TestCase + setup do + @root = Dir.mktmpdir + File.write( + "#{@root}/sample", + <<~EOF + Sample data for testing + A line after title + EOF + ) + @op = OpenDAL::Operator.new("fs", {"root" => @root}) + @io_read = @op.open("sample", "r") + @io_write = @op.open("sample_write", "w") + end + + teardown do + @io_read.close + @io_write.close + FileUtils.remove_entry(@root) if File.exist?(@root) + end + + test "#binmode? returns" do + assert_not @io_read.binmode? + end + + test "#binmode returns" do + assert_nothing_raised do + @io_read.binmode + end + end + + test "#close closes IO" do + assert_nothing_raised do + @io_read.close + end + end + + test "#close_read closes reader" do + assert_nothing_raised do + @io_read.close_read + end + end + + test "#close_write closes writer" do + assert_nothing_raised do + @io_read.close_write + end + end + + test "#closed? returns" do + assert_not @io_read.closed? + end + + test "#closed_read? returns" do + assert_not @io_read.closed_read? + end + + test "#closed_write? returns" do + assert @io_read.closed_write? + end + + test "#read reads" do + result = @io_read.read(nil) + + assert_equal "Sample data for testing\nA line after title\n", result + # should be `assert_equal Encoding::UTF_8, result.encoding` + end + + test "#write writes" do + @io_write.write("This is a sentence.") + @io_write.close + assert_equal "This is a sentence.", File.read("#{@root}/sample_write") + end + + test "#readline reads a line" do + line = @io_read.readline + + assert_equal "Sample data for testing\n", line + # should be `assert_equal Encoding::UTF_8, line.encoding` + end + + test "#readlines reads all lines" do + lines = @io_read.readlines + + assert_equal ["Sample data for testing\n", "A line after title\n"], lines + # should be `assert_equal Encoding::UTF_8, lines.first.encoding` + end + + test "#tell returns position" do + assert_equal 0, @io_read.tell + end + + test "#pos= moves position" do + @io_read.pos = 5 + + assert_equal 5, @io_read.pos + end + + test "#rewind moves position to start" do + @io_read.pos = 5 + @io_read.rewind + + assert_equal 0, @io_read.pos + end + + test "#eof returns" do + assert_not @io_read.eof + end + + test "#eof? returns" do + @io_read.read(nil) + assert @io_read.eof? + end + + test "#length returns" do + assert_equal 43, @io_read.length + end + + test "#size returns" do + assert_equal 43, @io_read.size + end +end