diff --git a/.editorconfig b/.editorconfig index 7dfed389bee3..2111a1d093a2 100644 --- a/.editorconfig +++ b/.editorconfig @@ -61,3 +61,6 @@ indent_size = 2 [*.{yaml,yml}] indent_size = 2 + +[*.rb] +indent_size = 2 diff --git a/.gitignore b/.gitignore index 5f15585d6716..d867d08e272c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ # IDE and editor -.vscode .idea - +.vscode +!.vscode/settings.json **/target **/vendor dist/ diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 000000000000..8a903fd1a084 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,11 @@ +{ + "rust-analyzer.cargo.allTargets": true, + "rust-analyzer.cargo.features": "all", + "rust-analyzer.linkedProjects": [ + "${workspaceFolder}/core/Cargo.toml", + "${workspaceFolder}/bindings/python/Cargo.toml", + "${workspaceFolder}/bindings/java/Cargo.toml", + "${workspaceFolder}/bindings/nodejs/Cargo.toml", + ], + "java.compile.nullAnalysis.mode": "automatic" +} diff --git a/bindings/ruby/lib/opendal.rb b/bindings/ruby/lib/opendal.rb index 28bd98139472..5dbb0404cf70 100644 --- a/bindings/ruby/lib/opendal.rb +++ b/bindings/ruby/lib/opendal.rb @@ -17,5 +17,5 @@ # frozen_string_literal: true -require_relative "opendal_ruby/version" require_relative "opendal_ruby/opendal_ruby" +require_relative "opendal_ruby/io" diff --git a/bindings/ruby/lib/opendal_ruby/io.rb b/bindings/ruby/lib/opendal_ruby/io.rb new file mode 100644 index 000000000000..654e59bfb677 --- /dev/null +++ b/bindings/ruby/lib/opendal_ruby/io.rb @@ -0,0 +1,66 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# frozen_string_literal: true + +module OpenDAL + class IO + # Reads all lines from the stream into an array. + # Raises `EOFError` when the end of the file is reached. + def readlines + results = [] + + loop do + results << readline + rescue EOFError + break + end + + results + end + + # Rewinds the stream to the beginning. + def rewind + seek(0, ::IO::SEEK_SET) + end + + # Sets the file position to `new_position`. + def pos=(new_position) + seek(new_position, ::IO::SEEK_SET) + end + + alias_method :pos, :tell + + # Checks if the stream is at the end of the file. + def eof + position = tell + seek(0, ::IO::SEEK_END) + tell == position + end + + alias_method :eof?, :eof + + # Returns the total length of the stream. + def length + current_position = tell + seek(0, ::IO::SEEK_END) + tell.tap { self.pos = current_position } + end + + alias_method :size, :length + end +end diff --git a/bindings/ruby/lib/opendal_ruby/version.rb b/bindings/ruby/lib/opendal_ruby/version.rb deleted file mode 100644 index e5ff6578732d..000000000000 --- a/bindings/ruby/lib/opendal_ruby/version.rb +++ /dev/null @@ -1,22 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# frozen_string_literal: true - -module OpenDAL - VERSION = "0.1.9" -end diff --git a/bindings/ruby/opendal.gemspec b/bindings/ruby/opendal.gemspec index df716d91f6c7..fdb9d121be9e 100644 --- a/bindings/ruby/opendal.gemspec +++ b/bindings/ruby/opendal.gemspec @@ -17,11 +17,20 @@ # frozen_string_literal: true -require_relative "lib/opendal_ruby/version" +require "json" Gem::Specification.new do |spec| spec.name = "opendal" - spec.version = OpenDAL::VERSION + # RubyGems integrates and expects `cargo`. + # Read more about [Gem::Ext::CargoBuilder](https://github.com/rubygems/rubygems/blob/v3.5.23/lib/rubygems/ext/cargo_builder.rb) + # + # OpenDAL relies on "version" in `Cargo.toml` for the release process. You can read this gem spec with: + # `bundle exec ruby -e 'puts Gem::Specification.load("opendal.gemspec")'` + # + # keep in sync the key "opendal-ruby" with `Rakefile`. + # + # uses `cargo` to extract the version. + spec.version = JSON.parse(`cargo metadata --format-version 1`.strip)["packages"].find { |p| p["name"] == "opendal-ruby" }["version"] spec.authors = ["OpenDAL Contributors"] spec.email = ["dev@opendal.apache.org"] diff --git a/bindings/ruby/src/io.rs b/bindings/ruby/src/io.rs new file mode 100644 index 000000000000..01a2daf4606e --- /dev/null +++ b/bindings/ruby/src/io.rs @@ -0,0 +1,330 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cell::RefCell; +use std::collections::HashSet; +use std::io::BufRead; +use std::io::Read; +use std::io::Seek; +use std::io::SeekFrom; +use std::io::Write; + +use magnus::class; +use magnus::method; +use magnus::prelude::*; +use magnus::Error; +use magnus::RModule; + +use crate::*; + +// `Io` is the rust implementation for `OpenDAL::IO`. `Io` follows similar Ruby IO classes, such as: +// - IO +// - StringIO +// +// `Io` is not exactly an `IO` but is unidirectional (either `Reader` or `Writer`). +// TODO: implement encoding. +#[magnus::wrap(class = "OpenDAL::IO", free_immediately, size)] +pub struct Io(RefCell); + +enum FileState { + Reader(ocore::StdReader, bool), // bool indicates binary mode + Writer(ocore::StdWriter, bool), // bool indicates binary mode + Closed, +} + +pub fn format_io_error(err: std::io::Error) -> Error { + Error::new(exception::runtime_error(), err.to_string()) +} + +impl Io { + /// Creates a new `OpenDAL::IO` object in Ruby. + /// + // @param ruby Ruby handle, required for exception handling. + // @param operator OpenDAL operator for file operations. + // @param path Path to the file. + // @param mode Mode string, e.g., "r", "w", or "rb". + // + // The mode must contain unique characters. Invalid or duplicate modes will raise an `ArgumentError`. + pub fn new( + ruby: &Ruby, + operator: ocore::BlockingOperator, + path: String, + mode: String, + ) -> Result { + let mut mode_flags = HashSet::new(); + let is_unique = mode.chars().all(|c| mode_flags.insert(c)); + if !is_unique { + return Err(Error::new( + ruby.exception_arg_error(), + format!("Invalid access mode {mode}"), + )); + } + + let binary_mode = mode_flags.contains(&'b'); + + if mode_flags.contains(&'r') { + Ok(Self(RefCell::new(FileState::Reader( + operator + .reader(&path) + .map_err(format_magnus_error)? + .into_std_read(..) + .map_err(format_magnus_error)?, + binary_mode, + )))) + } else if mode_flags.contains(&'w') { + Ok(Self(RefCell::new(FileState::Writer( + operator + .writer(&path) + .map_err(format_magnus_error)? + .into_std_write(), + binary_mode, + )))) + } else { + Err(Error::new( + ruby.exception_runtime_error(), + format!("OpenDAL doesn't support mode: {mode}"), + )) + } + } + + /// Enables binary mode for the stream. + fn binary_mode(ruby: &Ruby, rb_self: &Self) -> Result<(), Error> { + let mut cell = rb_self.0.borrow_mut(); + match &mut *cell { + FileState::Reader(_, ref mut is_binary_mode) => { + *is_binary_mode = true; + Ok(()) + } + FileState::Writer(_, ref mut is_binary_mode) => { + *is_binary_mode = true; + Ok(()) + } + FileState::Closed => Err(Error::new(ruby.exception_io_error(), "closed stream")), + } + } + + /// Returns if the stream is on binary mode. + fn is_binary_mode(ruby: &Ruby, rb_self: &Self) -> Result { + match *rb_self.0.borrow() { + FileState::Reader(_, is_binary_mode) => Ok(is_binary_mode), + FileState::Writer(_, is_binary_mode) => Ok(is_binary_mode), + FileState::Closed => Err(Error::new(ruby.exception_io_error(), "closed stream")), + } + } + + /// Checks if the stream is in binary mode. + fn close(&self) -> Result<(), Error> { + // skips closing reader because `StdReader` doesn't have `close()`. + let mut cell = self.0.borrow_mut(); + if let FileState::Writer(writer, _) = &mut *cell { + writer.close().map_err(format_io_error)?; + } + *cell = FileState::Closed; + Ok(()) + } + + /// Closes the stream and transitions the state to `Closed`. + fn close_read(&self) -> Result<(), Error> { + *self.0.borrow_mut() = FileState::Closed; + Ok(()) + } + + /// Reads data from the stream. + fn close_write(&self) -> Result<(), Error> { + let mut cell = self.0.borrow_mut(); + if let FileState::Writer(writer, _) = &mut *cell { + writer.close().map_err(format_io_error)?; + } + *cell = FileState::Closed; + Ok(()) + } + + fn is_closed(&self) -> Result { + Ok(matches!(*self.0.borrow(), FileState::Closed)) + } + + fn is_closed_read(&self) -> Result { + Ok(!matches!(*self.0.borrow(), FileState::Reader(_, _))) + } + + fn is_closed_write(&self) -> Result { + Ok(!matches!(*self.0.borrow(), FileState::Writer(_, _))) + } +} + +impl Io { + /// Reads data from the stream. + /// TODO: + /// - support default parameters + /// - support encoding + /// + /// @param size The maximum number of bytes to read. Reads all data if `None`. + fn read(ruby: &Ruby, rb_self: &Self, size: Option) -> Result { + // FIXME: consider what to return exactly + if let FileState::Reader(reader, _) = &mut *rb_self.0.borrow_mut() { + let buffer = match size { + Some(size) => { + let mut bs = vec![0; size]; + let n = reader.read(&mut bs).map_err(format_io_error)?; + bs.truncate(n); + bs + } + None => { + let mut buffer = Vec::new(); + reader.read_to_end(&mut buffer).map_err(format_io_error)?; + buffer + } + }; + + Ok(buffer.into()) + } else { + Err(Error::new( + ruby.exception_runtime_error(), + "I/O operation failed for reading on closed file.", + )) + } + } + + /// Reads a single line from the stream. + // TODO: extend readline with parameters + fn readline(ruby: &Ruby, rb_self: &Self) -> Result { + if let FileState::Reader(reader, _) = &mut *rb_self.0.borrow_mut() { + let mut buffer = String::new(); + let size = reader.read_line(&mut buffer).map_err(format_io_error)?; + if size == 0 { + return Err(Error::new( + ruby.exception_eof_error(), + "end of file reached", + )); + } + + Ok(buffer) + } else { + Err(Error::new( + ruby.exception_runtime_error(), + "I/O operation failed for reading on closed file.", + )) + } + } + + /// Writes data to the stream. + /// + /// @param bs The string data to write to the stream. + fn write(ruby: &Ruby, rb_self: &Self, bs: String) -> Result { + if let FileState::Writer(writer, _) = &mut *rb_self.0.borrow_mut() { + Ok(writer + .write_all(bs.as_bytes()) + .map(|_| bs.len()) + .map_err(format_io_error)?) + } else { + Err(Error::new( + ruby.exception_runtime_error(), + "I/O operation failed for reading on write only file.", + )) + } + } +} + +impl Io { + /// Moves the file position based on the offset and whence. + /// + /// @param offset The position offset. + /// @param whence The reference point: + /// - 0 = IO:SEEK_SET (Start) + /// - 1 = IO:SEEK_CUR (Current position) + /// - 2 = IO:SEEK_END (From the end) + fn seek(ruby: &Ruby, rb_self: &Self, offset: i64, whence: u8) -> Result { + match &mut *rb_self.0.borrow_mut() { + FileState::Reader(reader, _) => { + let whence = match whence { + 0 => SeekFrom::Start(offset as u64), + 1 => SeekFrom::Current(offset), + 2 => SeekFrom::End(offset), + _ => return Err(Error::new(ruby.exception_arg_error(), "invalid whence")), + }; + + reader.seek(whence).map_err(format_io_error)?; + + Ok(0) + } + FileState::Writer(_, _) => Err(Error::new( + ruby.exception_runtime_error(), + "I/O operation failed for reading on write only file.", + )), + FileState::Closed => Err(Error::new( + ruby.exception_runtime_error(), + "I/O operation failed for reading on closed file.", + )), + } + } + + /// Returns the current position of the file pointer in the stream. + fn tell(ruby: &Ruby, rb_self: &Self) -> Result { + match &mut *rb_self.0.borrow_mut() { + FileState::Reader(reader, _) => { + Ok(reader.stream_position().map_err(format_io_error)?) + } + FileState::Writer(_, _) => Err(Error::new( + ruby.exception_runtime_error(), + "I/O operation failed for reading on write only file.", + )), + FileState::Closed => Err(Error::new( + ruby.exception_runtime_error(), + "I/O operation failed for reading on closed file.", + )), + } + } + + // TODO: consider implement: + // - lineno + // - set_lineno + // - getc + // - putc + // - gets + // - puts +} + +/// Defines the `OpenDAL::IO` class in the given Ruby module and binds its methods. +/// +/// This function uses Magnus's built-in Ruby thread-safety features to define the +/// `OpenDAL::IO` class and its methods in the provided Ruby module (`gem_module`). +/// +/// # Ruby Object Lifetime and Safety +/// +/// Ruby objects can only exist in the Ruby heap and are tracked by Ruby's garbage collector (GC). +/// While we can allocate and store Ruby-related objects in the Rust heap, Magnus does not +/// automatically track such objects. Therefore, it is critical to work within Magnus's safety +/// guidelines when integrating Rust objects with Ruby. Read more in the Magnus documentation: +/// [Magnus Safety Documentation](https://github.com/matsadler/magnus#safety). +pub fn include(gem_module: &RModule) -> Result<(), Error> { + let class = gem_module.define_class("IO", class::object())?; + class.define_method("binmode", method!(Io::binary_mode, 0))?; + class.define_method("binmode?", method!(Io::is_binary_mode, 0))?; + class.define_method("close", method!(Io::close, 0))?; + class.define_method("close_read", method!(Io::close_read, 0))?; + class.define_method("close_write", method!(Io::close_write, 0))?; + class.define_method("closed?", method!(Io::is_closed, 0))?; + class.define_method("closed_read?", method!(Io::is_closed_read, 0))?; + class.define_method("closed_write?", method!(Io::is_closed_write, 0))?; + class.define_method("read", method!(Io::read, 1))?; + class.define_method("write", method!(Io::write, 1))?; + class.define_method("readline", method!(Io::readline, 0))?; + class.define_method("seek", method!(Io::seek, 2))?; + class.define_method("tell", method!(Io::tell, 0))?; + + Ok(()) +} diff --git a/bindings/ruby/src/lib.rs b/bindings/ruby/src/lib.rs index 5de42706927a..f47b964b4591 100644 --- a/bindings/ruby/src/lib.rs +++ b/bindings/ruby/src/lib.rs @@ -25,6 +25,7 @@ use magnus::Ruby; pub use ::opendal as ocore; mod capability; +mod io; mod metadata; mod operator; @@ -39,6 +40,7 @@ fn init(ruby: &Ruby) -> Result<(), Error> { let _ = operator::include(&gem_module); let _ = metadata::include(&gem_module); let _ = capability::include(&gem_module); + let _ = io::include(&gem_module); Ok(()) } diff --git a/bindings/ruby/src/operator.rs b/bindings/ruby/src/operator.rs index 997b55a607b6..45b11392a4c9 100644 --- a/bindings/ruby/src/operator.rs +++ b/bindings/ruby/src/operator.rs @@ -24,8 +24,10 @@ use magnus::prelude::*; use magnus::Error; use magnus::RModule; use magnus::RString; +use magnus::Ruby; use crate::capability::Capability; +use crate::io::Io; use crate::metadata::Metadata; use crate::*; @@ -34,39 +36,48 @@ use crate::*; struct Operator(ocore::BlockingOperator); impl Operator { - fn new(scheme: String, options: Option>) -> Result { + fn new( + ruby: &Ruby, + scheme: String, + options: Option>, + ) -> Result { let scheme = ocore::Scheme::from_str(&scheme) .map_err(|err| { ocore::Error::new(ocore::ErrorKind::Unexpected, "unsupported scheme") .set_source(err) }) - .map_err(format_magnus_error)?; + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string()))?; let options = options.unwrap_or_default(); let op = ocore::Operator::via_iter(scheme, options) - .map_err(format_magnus_error)? + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string()))? .blocking(); Ok(Operator(op)) } /// Reads the whole path into string. - fn read(&self, path: String) -> Result { - let buffer = self.0.read(&path).map_err(format_magnus_error)?; + fn read(ruby: &Ruby, rb_self: &Self, path: String) -> Result { + let buffer = rb_self + .0 + .read(&path) + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string()))?; Ok(buffer.to_bytes()) } /// Writes string into given path. - fn write(&self, path: String, bs: RString) -> Result<(), Error> { - self.0 + fn write(ruby: &Ruby, rb_self: &Self, path: String, bs: RString) -> Result<(), Error> { + rb_self + .0 .write(&path, bs.to_bytes()) - .map_err(format_magnus_error) + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string())) } /// Gets current path's metadata **without cache** directly. - fn stat(&self, path: String) -> Result { - self.0 + fn stat(ruby: &Ruby, rb_self: &Self, path: String) -> Result { + rb_self + .0 .stat(&path) - .map_err(format_magnus_error) + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string())) .map(Metadata::new) } @@ -78,33 +89,57 @@ impl Operator { /// Creates directory recursively similar as `mkdir -p` /// The ending path must be `/`. Otherwise, OpenDAL throws `NotADirectory` error. - fn create_dir(&self, path: String) -> Result<(), Error> { - self.0.create_dir(&path).map_err(format_magnus_error) + fn create_dir(ruby: &Ruby, rb_self: &Self, path: String) -> Result<(), Error> { + rb_self + .0 + .create_dir(&path) + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string())) } /// Deletes given path - fn delete(&self, path: String) -> Result<(), Error> { - self.0.delete(&path).map_err(format_magnus_error) + fn delete(ruby: &Ruby, rb_self: &Self, path: String) -> Result<(), Error> { + rb_self + .0 + .delete(&path) + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string())) } /// Returns if this path exists - fn exists(&self, path: String) -> Result { - self.0.exists(&path).map_err(format_magnus_error) + fn exists(ruby: &Ruby, rb_self: &Self, path: String) -> Result { + rb_self + .0 + .exists(&path) + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string())) } /// Renames a file from `from` to `to` - fn rename(&self, from: String, to: String) -> Result<(), Error> { - self.0.rename(&from, &to).map_err(format_magnus_error) + fn rename(ruby: &Ruby, rb_self: &Self, from: String, to: String) -> Result<(), Error> { + rb_self + .0 + .rename(&from, &to) + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string())) } /// Removes the path and all nested directories and files recursively - fn remove_all(&self, path: String) -> Result<(), Error> { - self.0.remove_all(&path).map_err(format_magnus_error) + fn remove_all(ruby: &Ruby, rb_self: &Self, path: String) -> Result<(), Error> { + rb_self + .0 + .remove_all(&path) + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string())) } /// Copies a file from `from` to `to`. - fn copy(&self, from: String, to: String) -> Result<(), Error> { - self.0.copy(&from, &to).map_err(format_magnus_error) + fn copy(ruby: &Ruby, rb_self: &Self, from: String, to: String) -> Result<(), Error> { + rb_self + .0 + .copy(&from, &to) + .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string())) + } + + /// Opens a IO-like reader for the given path. + fn open(ruby: &Ruby, rb_self: &Self, path: String, mode: String) -> Result { + let operator = rb_self.0.clone(); + Ok(Io::new(&ruby, operator, path, mode)?) } } @@ -121,6 +156,7 @@ pub fn include(gem_module: &RModule) -> Result<(), Error> { class.define_method("rename", method!(Operator::rename, 2))?; class.define_method("remove_all", method!(Operator::remove_all, 1))?; class.define_method("copy", method!(Operator::copy, 2))?; + class.define_method("open", method!(Operator::open, 2))?; Ok(()) } diff --git a/bindings/ruby/test/blocking_op_test.rb b/bindings/ruby/test/blocking_op_test.rb index b6cee4d716c4..0bdb5bd85c5c 100644 --- a/bindings/ruby/test/blocking_op_test.rb +++ b/bindings/ruby/test/blocking_op_test.rb @@ -95,4 +95,12 @@ class OpenDalTest < ActiveSupport::TestCase assert File.exist?("#{@root}/sample") assert File.exist?("#{@root}/new_name") end + + test "opens an IO" do + io = @op.open("/sample", "rb") + + assert_not io.closed? + + io.close + end end diff --git a/bindings/ruby/test/io_test.rb b/bindings/ruby/test/io_test.rb new file mode 100644 index 000000000000..8a6060a0b453 --- /dev/null +++ b/bindings/ruby/test/io_test.rb @@ -0,0 +1,144 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# frozen_string_literal: true + +require "test_helper" +require "tmpdir" + +class IOTest < ActiveSupport::TestCase + setup do + @root = Dir.mktmpdir + File.write( + "#{@root}/sample", + <<~EOF + Sample data for testing + A line after title + EOF + ) + @op = OpenDAL::Operator.new("fs", {"root" => @root}) + @io_read = @op.open("sample", "r") + @io_write = @op.open("sample_write", "w") + end + + teardown do + @io_read.close + @io_write.close + FileUtils.remove_entry(@root) if File.exist?(@root) + end + + test "#binmode? returns" do + assert_not @io_read.binmode? + end + + test "#binmode returns" do + assert_nothing_raised do + @io_read.binmode + end + end + + test "#close closes IO" do + assert_nothing_raised do + @io_read.close + end + end + + test "#close_read closes reader" do + assert_nothing_raised do + @io_read.close_read + end + end + + test "#close_write closes writer" do + assert_nothing_raised do + @io_read.close_write + end + end + + test "#closed? returns" do + assert_not @io_read.closed? + end + + test "#closed_read? returns" do + assert_not @io_read.closed_read? + end + + test "#closed_write? returns" do + assert @io_read.closed_write? + end + + test "#read reads" do + result = @io_read.read(nil) + + assert_equal "Sample data for testing\nA line after title\n", result + # should be `assert_equal Encoding::UTF_8, result.encoding` + end + + test "#write writes" do + @io_write.write("This is a sentence.") + @io_write.close + assert_equal "This is a sentence.", File.read("#{@root}/sample_write") + end + + test "#readline reads a line" do + line = @io_read.readline + + assert_equal "Sample data for testing\n", line + # should be `assert_equal Encoding::UTF_8, line.encoding` + end + + test "#readlines reads all lines" do + lines = @io_read.readlines + + assert_equal ["Sample data for testing\n", "A line after title\n"], lines + # should be `assert_equal Encoding::UTF_8, lines.first.encoding` + end + + test "#tell returns position" do + assert_equal 0, @io_read.tell + end + + test "#pos= moves position" do + @io_read.pos = 5 + + assert_equal 5, @io_read.pos + end + + test "#rewind moves position to start" do + @io_read.pos = 5 + @io_read.rewind + + assert_equal 0, @io_read.pos + end + + test "#eof returns" do + assert_not @io_read.eof + end + + test "#eof? returns" do + @io_read.read(nil) + assert @io_read.eof? + end + + test "#length returns" do + assert_equal 43, @io_read.length + end + + test "#size returns" do + assert_equal 43, @io_read.size + end +end diff --git a/core/src/lib.rs b/core/src/lib.rs index 7a7f6eae18ff..8f989adf89a5 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -144,7 +144,6 @@ mod tests { use std::mem::size_of; use super::*; - /// This is not a real test case. /// /// We assert our public structs here to make sure we don't introduce @@ -152,8 +151,8 @@ mod tests { #[test] fn assert_size() { assert_eq!(32, size_of::()); - assert_eq!(296, size_of::()); - assert_eq!(272, size_of::()); + assert_eq!(320, size_of::()); + assert_eq!(296, size_of::()); assert_eq!(1, size_of::()); assert_eq!(24, size_of::()); } diff --git a/core/src/raw/http_util/header.rs b/core/src/raw/http_util/header.rs index 6c8ba65dbab9..2da77a4b08dc 100644 --- a/core/src/raw/http_util/header.rs +++ b/core/src/raw/http_util/header.rs @@ -173,6 +173,10 @@ pub fn parse_into_metadata(path: &str, headers: &HeaderMap) -> Result m.set_content_type(v); } + if let Some(v) = parse_content_encoding(headers)? { + m.set_content_encoding(v); + } + if let Some(v) = parse_content_range(headers)? { m.set_content_range(v); } diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs index 7cdac88a588e..3b196d7b20ee 100644 --- a/core/src/raw/ops.rs +++ b/core/src/raw/ops.rs @@ -541,6 +541,7 @@ pub struct OpWrite { concurrent: usize, content_type: Option, content_disposition: Option, + content_encoding: Option, cache_control: Option, executor: Option, if_match: Option, @@ -598,6 +599,17 @@ impl OpWrite { self } + /// Get the content encoding from option + pub fn content_encoding(&self) -> Option<&str> { + self.content_encoding.as_deref() + } + + /// Set the content encoding of option + pub fn with_content_encoding(mut self, content_encoding: &str) -> Self { + self.content_encoding = Some(content_encoding.to_string()); + self + } + /// Get the cache control from option pub fn cache_control(&self) -> Option<&str> { self.cache_control.as_deref() diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs index 36f852da8ba6..28b33d37de43 100644 --- a/core/src/services/obs/backend.rs +++ b/core/src/services/obs/backend.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; @@ -27,7 +28,7 @@ use reqsign::HuaweicloudObsConfig; use reqsign::HuaweicloudObsCredentialLoader; use reqsign::HuaweicloudObsSigner; -use super::core::ObsCore; +use super::core::{constants, ObsCore}; use super::delete::ObsDeleter; use super::error::parse_error; use super::lister::ObsLister; @@ -173,7 +174,9 @@ impl Builder for ObsBuilder { let (endpoint, is_obs_default) = { let host = uri.host().unwrap_or_default().to_string(); - if host.starts_with("obs.") && host.ends_with(".myhuaweicloud.com") { + if host.starts_with("obs.") + && (host.ends_with(".myhuaweicloud.com") || host.ends_with(".huawei.com")) + { (format!("{bucket}.{host}"), true) } else { (host, false) @@ -282,6 +285,7 @@ impl Access for ObsBackend { } else { Some(usize::MAX) }, + write_with_user_metadata: true, delete: true, copy: true, @@ -304,12 +308,33 @@ impl Access for ObsBackend { async fn stat(&self, path: &str, args: OpStat) -> Result { let resp = self.core.obs_head_object(path, &args).await?; + let headers = resp.headers(); let status = resp.status(); // The response is very similar to azblob. match status { - StatusCode::OK => parse_into_metadata(path, resp.headers()).map(RpStat::new), + StatusCode::OK => { + let mut meta = parse_into_metadata(path, headers)?; + let user_meta = headers + .iter() + .filter_map(|(name, _)| { + name.as_str() + .strip_prefix(constants::X_OBS_META_PREFIX) + .and_then(|stripped_key| { + parse_header_to_str(headers, name) + .unwrap_or(None) + .map(|val| (stripped_key.to_string(), val.to_string())) + }) + }) + .collect::>(); + + if !user_meta.is_empty() { + meta.with_user_metadata(user_meta); + } + + Ok(RpStat::new(meta)) + } StatusCode::NOT_FOUND if path.ends_with('/') => { Ok(RpStat::new(Metadata::new(EntryMode::DIR))) } diff --git a/core/src/services/obs/core.rs b/core/src/services/obs/core.rs index e331f327c770..079c23e31a64 100644 --- a/core/src/services/obs/core.rs +++ b/core/src/services/obs/core.rs @@ -37,6 +37,10 @@ use serde::Serialize; use crate::raw::*; use crate::*; +pub mod constants { + pub const X_OBS_META_PREFIX: &str = "x-obs-meta-"; +} + pub struct ObsCore { pub bucket: String, pub root: String, @@ -167,6 +171,13 @@ impl ObsCore { req = req.header(CONTENT_TYPE, mime) } + // Set user metadata headers. + if let Some(user_metadata) = args.user_metadata() { + for (key, value) in user_metadata { + req = req.header(format!("{}{}", constants::X_OBS_META_PREFIX, key), value) + } + } + let req = req.body(body).map_err(new_request_build_error)?; Ok(req) diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 52f311415489..eb9a4370b089 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -923,6 +923,7 @@ impl Access for S3Backend { .set_name(&self.core.bucket) .set_native_capability(Capability { stat: true, + stat_has_content_encoding: true, stat_with_if_match: true, stat_with_if_none_match: true, stat_with_override_cache_control: !self.core.disable_stat_with_override, @@ -943,6 +944,7 @@ impl Access for S3Backend { write_can_multi: true, write_with_cache_control: true, write_with_content_type: true, + write_with_content_encoding: true, write_with_if_match: !self.core.disable_write_with_if_match, write_with_if_not_exists: true, write_with_user_metadata: true, diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index b69ef327ff69..19bdbfdf7209 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -31,6 +31,7 @@ use constants::X_AMZ_META_PREFIX; use http::header::HeaderName; use http::header::CACHE_CONTROL; use http::header::CONTENT_DISPOSITION; +use http::header::CONTENT_ENCODING; use http::header::CONTENT_LENGTH; use http::header::CONTENT_TYPE; use http::header::HOST; @@ -452,6 +453,10 @@ impl S3Core { req = req.header(CONTENT_DISPOSITION, pos) } + if let Some(encoding) = args.content_encoding() { + req = req.header(CONTENT_ENCODING, encoding); + } + if let Some(cache_control) = args.cache_control() { req = req.header(CACHE_CONTROL, cache_control) } diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs index 2fdf56e87ce3..76bbc09611a5 100644 --- a/core/src/types/capability.rs +++ b/core/src/types/capability.rs @@ -90,6 +90,8 @@ pub struct Capability { pub stat_has_content_range: bool, /// Indicates whether content type information is available in stat response pub stat_has_content_type: bool, + /// Indicates whether content encoding information is available in stat response + pub stat_has_content_encoding: bool, /// Indicates whether entity tag is available in stat response pub stat_has_etag: bool, /// Indicates whether last modified timestamp is available in stat response @@ -126,6 +128,8 @@ pub struct Capability { pub write_with_content_type: bool, /// Indicates if Content-Disposition can be specified during write operations. pub write_with_content_disposition: bool, + /// Indicates if Content-Encoding can be specified during write operations. + pub write_with_content_encoding: bool, /// Indicates if Cache-Control can be specified during write operations. pub write_with_cache_control: bool, /// Indicates if conditional write operations using If-Match are supported. diff --git a/core/src/types/metadata.rs b/core/src/types/metadata.rs index 7314f25db5fd..c1d4155e1971 100644 --- a/core/src/types/metadata.rs +++ b/core/src/types/metadata.rs @@ -39,6 +39,7 @@ pub struct Metadata { content_md5: Option, content_range: Option, content_type: Option, + content_encoding: Option, etag: Option, last_modified: Option>, version: Option, @@ -56,6 +57,7 @@ impl Metadata { content_length: None, content_md5: None, content_type: None, + content_encoding: None, content_range: None, last_modified: None, etag: None, @@ -202,6 +204,17 @@ impl Metadata { self } + /// Content Encoding of this entry. + pub fn content_encoding(&self) -> Option<&str> { + self.content_encoding.as_deref() + } + + /// Set Content Encoding of this entry. + pub fn set_content_encoding(&mut self, v: &str) -> &mut Self { + self.content_encoding = Some(v.to_string()); + self + } + /// Content Range of this entry. /// /// Content Range is defined by [RFC 9110](https://httpwg.org/specs/rfc9110.html#field.content-range). diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index 24bba50d266c..de4fffe66822 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -1407,6 +1407,38 @@ impl Operator { /// # } /// ``` /// + /// ## `content_encoding` + /// + /// Sets Content-Encoding header for this write request. + /// + /// ### Capability + /// + /// Check [`Capability::write_with_content_encoding`] before using this feature. + /// + /// ### Behavior + /// + /// - If supported, sets Content-Encoding as system metadata on the target file + /// - The value should follow HTTP Content-Encoding header format + /// - If not supported, the value will be ignored + /// + /// This operation allows specifying the content encoding for the written content. + /// + /// ## Example + /// + /// ```no_run + /// # use opendal::Result; + /// # use opendal::Operator; + /// use bytes::Bytes; + /// # async fn test(op: Operator) -> Result<()> { + /// let bs = b"hello, world!".to_vec(); + /// let _ = op + /// .write_with("path/to/file", bs) + /// .content_encoding("br") + /// .await?; + /// # Ok(()) + /// # } + /// ``` + /// /// ## `if_none_match` /// /// Sets an `if none match` condition with specified ETag for this write request. diff --git a/core/src/types/operator/operator_futures.rs b/core/src/types/operator/operator_futures.rs index de8f277ff284..2d119055794f 100644 --- a/core/src/types/operator/operator_futures.rs +++ b/core/src/types/operator/operator_futures.rs @@ -318,6 +318,11 @@ impl>> FutureWrite { self.map(|(args, options, bs)| (args.with_content_disposition(v), options, bs)) } + /// Set the content encoding of option + pub fn content_encoding(self, v: &str) -> Self { + self.map(|(args, options, bs)| (args.with_content_encoding(v), options, bs)) + } + /// Set the executor for this operation. pub fn executor(self, executor: Executor) -> Self { self.map(|(args, options, bs)| (args.with_executor(executor), options, bs)) diff --git a/core/tests/behavior/async_write.rs b/core/tests/behavior/async_write.rs index 4540925018d0..8e577450f232 100644 --- a/core/tests/behavior/async_write.rs +++ b/core/tests/behavior/async_write.rs @@ -44,6 +44,7 @@ pub fn tests(op: &Operator, tests: &mut Vec) { test_write_with_cache_control, test_write_with_content_type, test_write_with_content_disposition, + test_write_with_content_encoding, test_write_with_if_none_match, test_write_with_if_not_exists, test_write_with_if_match, @@ -211,6 +212,28 @@ pub async fn test_write_with_content_disposition(op: Operator) -> Result<()> { Ok(()) } +/// Write a single file with content encoding should succeed. +pub async fn test_write_with_content_encoding(op: Operator) -> Result<()> { + if !op.info().full_capability().write_with_content_encoding { + return Ok(()); + } + + let (path, content, _) = TEST_FIXTURE.new_file(op.clone()); + + let target_content_encoding = "gzip"; + op.write_with(&path, content) + .content_encoding(target_content_encoding) + .await?; + + let meta = op.stat(&path).await.expect("stat must succeed"); + assert_eq!( + meta.content_encoding() + .expect("content encoding must exist"), + target_content_encoding + ); + Ok(()) +} + /// write a single file with user defined metadata should succeed. pub async fn test_write_with_user_metadata(op: Operator) -> Result<()> { if !op.info().full_capability().write_with_user_metadata {