diff --git a/.github/workflows/ci_bindings_cpp.yml b/.github/workflows/ci_bindings_cpp.yml index 9c1376efac6b..9f1fafa703b4 100644 --- a/.github/workflows/ci_bindings_cpp.yml +++ b/.github/workflows/ci_bindings_cpp.yml @@ -41,13 +41,13 @@ permissions: jobs: test: - runs-on: ubuntu-latest + runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v4 - name: Install dependencies run: | sudo apt-get update - sudo apt-get install libgtest-dev ninja-build libboost-all-dev valgrind doxygen + sudo apt-get install ninja-build valgrind doxygen - name: Setup Rust toolchain uses: ./.github/actions/setup @@ -69,3 +69,12 @@ jobs: cmake -GNinja -DOPENDAL_ENABLE_TESTING=ON .. ninja valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --verbose ./opendal_cpp_test + + - name: Build Cpp binding with async && Run tests + working-directory: "bindings/cpp" + run: | + mkdir build-async + cd build-async + cmake -GNinja -DOPENDAL_DEV=ON -DOPENDAL_ENABLE_ASYNC=ON -DCMAKE_CXX_COMPILER=clang++-18 .. + ninja + ./opendal_cpp_test diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt index 0d8ce89dfed6..b2f232c1d816 100644 --- a/bindings/cpp/CMakeLists.txt +++ b/bindings/cpp/CMakeLists.txt @@ -21,9 +21,7 @@ project(opendal-cpp LANGUAGES CXX) include(FetchContent) set(OPENDAL_GOOGLETEST_VERSION 1.15.2 CACHE STRING "version of GoogleTest, 'external' to fallback to find_package()") set(OPENDAL_BOOST_VERSION 1.86.0 CACHE STRING "version of Boost, 'external' to fallback to find_package()") - -set(CMAKE_CXX_STANDARD 17) -set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(OPENDAL_CPPCORO_VERSION a4ef65281814b18fdd1ac5457d3e219347ec6cb8 CACHE STRING "version of cppcoro") if (NOT CMAKE_BUILD_TYPE) set(CMAKE_BUILD_TYPE Debug) @@ -34,6 +32,18 @@ option(OPENDAL_ENABLE_DOCUMENTATION "Enable generating document for opendal" OFF option(OPENDAL_DOCS_ONLY "Only build documentation (dev only for quick ci)" OFF) option(OPENDAL_ENABLE_TESTING "Enable building test binary for opendal" OFF) option(OPENDAL_DEV "Enable dev mode" OFF) +option(OPENDAL_ENABLE_ASYNC "Enable async mode (requires C++20)" OFF) + +if(OPENDAL_ENABLE_ASYNC) + set(CMAKE_CXX_STANDARD 20) + + if (NOT ((CMAKE_CXX_COMPILER_ID STREQUAL "Clang") OR (CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang"))) + message(FATAL_ERROR "currently C++ compiler must be clang for async mode") + endif() +else() + set(CMAKE_CXX_STANDARD 17) +endif() +set(CMAKE_CXX_STANDARD_REQUIRED ON) if (OPENDAL_DEV) set(OPENDAL_ENABLE_ADDRESS_SANITIZER ON) @@ -69,42 +79,48 @@ execute_process(COMMAND cargo locate-project --workspace --message-format plain string(REGEX REPLACE "/Cargo.toml\n$" "/target" CARGO_TARGET_DIR "${CARGO_TARGET_DIR}") set(CARGO_MANIFEST ${PROJECT_SOURCE_DIR}/Cargo.toml) set(RUST_SOURCE_FILE ${PROJECT_SOURCE_DIR}/src/lib.rs) -set(RUST_BRIDGE_CPP ${CARGO_TARGET_DIR}/cxxbridge/opendal-cpp/src/lib.rs.cc) -set(RUST_HEADER_FILE ${CARGO_TARGET_DIR}/cxxbridge/opendal-cpp/src/lib.rs.h) +list(APPEND RUST_BRIDGE_CPP ${CARGO_TARGET_DIR}/cxxbridge/opendal-cpp/src/lib.rs.cc) +list(APPEND RUST_HEADER_FILE ${CARGO_TARGET_DIR}/cxxbridge/opendal-cpp/src/lib.rs.h) +if (OPENDAL_ENABLE_ASYNC) + list(APPEND RUST_BRIDGE_CPP ${CARGO_TARGET_DIR}/cxxbridge/opendal-cpp/src/async.rs.cc) + list(APPEND RUST_HEADER_FILE ${CARGO_TARGET_DIR}/cxxbridge/opendal-cpp/src/async.rs.h) +endif() if (CMAKE_BUILD_TYPE STREQUAL "Debug") set(RUST_LIB ${CARGO_TARGET_DIR}/debug/${CMAKE_STATIC_LIBRARY_PREFIX}opendal_cpp${CMAKE_STATIC_LIBRARY_SUFFIX}) else() set(RUST_LIB ${CARGO_TARGET_DIR}/release/${CMAKE_STATIC_LIBRARY_PREFIX}opendal_cpp${CMAKE_STATIC_LIBRARY_SUFFIX}) endif() set(CPP_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/include ${CARGO_TARGET_DIR}/cxxbridge/opendal-cpp/src) -file(GLOB_RECURSE CPP_SOURCE_FILE src/*.cpp) -file(GLOB_RECURSE CPP_HEADER_FILE include/*.hpp) +list(APPEND CPP_SOURCE_FILE src/opendal.cpp) +list(APPEND CPP_HEADER_FILE include/opendal.hpp) +if (OPENDAL_ENABLE_ASYNC) + list(APPEND CPP_SOURCE_FILE src/opendal_async.cpp) + list(APPEND CPP_HEADER_FILE include/opendal_async.hpp) +endif() -if (CMAKE_BUILD_TYPE STREQUAL "Debug") - add_custom_command( - OUTPUT ${RUST_BRIDGE_CPP} ${RUST_LIB} ${RUST_HEADER_FILE} - COMMAND cargo build --manifest-path ${CARGO_MANIFEST} - DEPENDS ${RUST_SOURCE_FILE} - USES_TERMINAL - COMMENT "Running cargo..." - ) -else() - add_custom_command( - OUTPUT ${RUST_BRIDGE_CPP} ${RUST_LIB} ${RUST_HEADER_FILE} - COMMAND cargo build --manifest-path ${CARGO_MANIFEST} --release - DEPENDS ${RUST_SOURCE_FILE} - USES_TERMINAL - COMMENT "Running cargo..." - ) +if (NOT CMAKE_BUILD_TYPE STREQUAL "Debug") + list(APPEND CARGO_BUILD_FLAGS "--release") +endif() + +if (OPENDAL_ENABLE_ASYNC) + list(APPEND CARGO_BUILD_FLAGS "--features" "async") endif() +add_custom_target(cargo_build + COMMAND cargo build --manifest-path ${CARGO_MANIFEST} ${CARGO_BUILD_FLAGS} + BYPRODUCTS ${RUST_BRIDGE_CPP} ${RUST_LIB} ${RUST_HEADER_FILE} + DEPENDS ${RUST_SOURCE_FILE} + USES_TERMINAL + COMMENT "Running cargo..." +) + if(OPENDAL_BOOST_VERSION STREQUAL "external") find_package(Boost REQUIRED COMPONENTS date_time iostreams) else() # fetch Boost FetchContent_Declare( - Boost - URL https://github.com/boostorg/boost/releases/download/boost-${OPENDAL_BOOST_VERSION}/boost-${OPENDAL_BOOST_VERSION}-cmake.zip + Boost + URL https://github.com/boostorg/boost/releases/download/boost-${OPENDAL_BOOST_VERSION}/boost-${OPENDAL_BOOST_VERSION}-cmake.zip ) set(BOOST_INCLUDE_LIBRARIES date_time iostreams system) @@ -115,12 +131,17 @@ endif() add_library(opendal_cpp STATIC ${CPP_SOURCE_FILE} ${RUST_BRIDGE_CPP}) target_sources(opendal_cpp PUBLIC ${CPP_HEADER_FILE}) target_sources(opendal_cpp PRIVATE ${RUST_HEADER_FILE}) -target_include_directories(opendal_cpp PUBLIC ${CPP_INCLUDE_DIR} ${Boost_INCLUDE_DIRS}) -target_link_libraries(opendal_cpp PUBLIC ${RUST_LIB}) -target_link_libraries(opendal_cpp PRIVATE ${CMAKE_DL_LIBS} Boost::date_time) +target_include_directories(opendal_cpp PUBLIC ${CPP_INCLUDE_DIR}) +if (OPENDAL_ENABLE_ASYNC) + target_include_directories(opendal_cpp PUBLIC ${CARGO_TARGET_DIR}/cxxbridge) + target_compile_options(opendal_cpp PUBLIC -include ${PROJECT_SOURCE_DIR}/include/async_defs.hpp) +endif() +target_link_libraries(opendal_cpp PUBLIC ${RUST_LIB} Boost::date_time Boost::iostreams) +target_link_libraries(opendal_cpp PRIVATE ${CMAKE_DL_LIBS}) set_target_properties(opendal_cpp PROPERTIES ADDITIONAL_CLEAN_FILES ${CARGO_TARGET_DIR} ) +add_dependencies(opendal_cpp cargo_build) if (OPENDAL_ENABLE_ADDRESS_SANITIZER) target_compile_options(opendal_cpp PRIVATE -fsanitize=leak,address,undefined -fno-omit-frame-pointer -fno-common -O1) @@ -156,11 +177,25 @@ if (OPENDAL_ENABLE_TESTING) FetchContent_MakeAvailable(googletest) endif() - file(GLOB_RECURSE TEST_SOURCE_FILE tests/*.cpp) + if (OPENDAL_ENABLE_ASYNC) + FetchContent_Declare( + cppcoro + URL https://github.com/andreasbuhr/cppcoro/archive/${OPENDAL_CPPCORO_VERSION}.zip + ) + FetchContent_MakeAvailable(cppcoro) + endif() + + list(APPEND TEST_SOURCE_FILE tests/basic_test.cpp) + if (OPENDAL_ENABLE_ASYNC) + list(APPEND TEST_SOURCE_FILE tests/async_test.cpp) + endif() add_executable(opendal_cpp_test ${TEST_SOURCE_FILE}) target_include_directories(opendal_cpp_test PUBLIC ${CPP_INCLUDE_DIR} ${GTEST_INCLUDE_DIRS}) target_link_libraries(opendal_cpp_test ${GTEST_LDFLAGS} GTest::gtest_main opendal_cpp) target_compile_options(opendal_cpp_test PRIVATE ${GTEST_CFLAGS}) + if (OPENDAL_ENABLE_ASYNC) + target_link_libraries(opendal_cpp_test cppcoro) + endif() # enable address sanitizers if (OPENDAL_ENABLE_ADDRESS_SANITIZER) diff --git a/bindings/cpp/Cargo.toml b/bindings/cpp/Cargo.toml index 4f711b7e9053..2d82dc8c260b 100644 --- a/bindings/cpp/Cargo.toml +++ b/bindings/cpp/Cargo.toml @@ -34,6 +34,7 @@ crate-type = ["staticlib"] anyhow = "1.0" chrono = "0.4" cxx = "1.0" +cxx-async = { version = "0.1.2", optional = true } # this crate won't be published, we always use the local version opendal = { version = ">=0", path = "../../core", features = [ # These are default features before v0.46. TODO: change to optional features @@ -56,3 +57,6 @@ opendal = { version = ">=0", path = "../../core", features = [ [build-dependencies] cxx-build = "1.0" + +[features] +async = ["cxx-async", "cxx/c++20"] diff --git a/bindings/cpp/build.rs b/bindings/cpp/build.rs index 7d5d10f7b1b2..168102b0a221 100644 --- a/bindings/cpp/build.rs +++ b/bindings/cpp/build.rs @@ -15,8 +15,46 @@ // specific language governing permissions and limitations // under the License. +#[cfg(feature = "async")] +mod build_async { + use std::{ + env::var, + io, + path::{Path, PathBuf}, + }; + + fn copy_force, Q: AsRef>(src: P, dst: Q) -> io::Result<()> { + if dst.as_ref().exists() { + std::fs::remove_file(&dst)?; + } + + std::fs::copy(src, dst)?; + Ok(()) + } + + pub fn symlink_async_includes() { + let async_inc = var("DEP_CXX_ASYNC_INCLUDE").unwrap(); + let src_dir = PathBuf::from(async_inc).join("rust"); + + let prj_dir = var("CARGO_MANIFEST_DIR").unwrap(); + let dst_dir = PathBuf::from(prj_dir) + .join("target") + .join("cxxbridge") + .join("rust"); + + copy_force(src_dir.join("cxx_async.h"), dst_dir.join("cxx_async.h")).unwrap(); + } +} + fn main() { let _ = cxx_build::bridge("src/lib.rs"); + #[cfg(feature = "async")] + { + let _ = cxx_build::bridge("src/async.rs"); + build_async::symlink_async_includes(); + } println!("cargo:rerun-if-changed=src/lib.rs"); + #[cfg(feature = "async")] + println!("cargo:rerun-if-changed=src/async.rs"); } diff --git a/bindings/cpp/include/async_defs.hpp b/bindings/cpp/include/async_defs.hpp new file mode 100644 index 000000000000..c8a831d0ccc1 --- /dev/null +++ b/bindings/cpp/include/async_defs.hpp @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include "rust/cxx.h" +#include "rust/cxx_async.h" + +CXXASYNC_DEFINE_FUTURE(rust::Vec, opendal, ffi, async, RustFutureRead); +CXXASYNC_DEFINE_FUTURE(void, opendal, ffi, async, RustFutureWrite); diff --git a/bindings/cpp/include/opendal_async.hpp b/bindings/cpp/include/opendal_async.hpp new file mode 100644 index 000000000000..45524df5bb06 --- /dev/null +++ b/bindings/cpp/include/opendal_async.hpp @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "async.rs.h" +#include "async_defs.hpp" + +namespace opendal::async { + +class Operator { + public: + Operator(std::string_view scheme, + const std::unordered_map &config = {}); + + // Disable copy and assign + Operator(const Operator &) = delete; + Operator &operator=(const Operator &) = delete; + + // Enable move + Operator(Operator &&) = default; + Operator &operator=(Operator &&) = default; + ~Operator() = default; + + using ReadFuture = opendal::ffi::async::RustFutureRead; + ReadFuture read(std::string_view path); + + using WriteFuture = opendal::ffi::async::RustFutureWrite; + WriteFuture write(std::string_view path, std::span data); + + private: + rust::Box operator_; +}; + +} // namespace opendal::async diff --git a/bindings/cpp/src/async.rs b/bindings/cpp/src/async.rs new file mode 100644 index 000000000000..595006f269e0 --- /dev/null +++ b/bindings/cpp/src/async.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use anyhow::Result; +use cxx_async::CxxAsyncException; +use opendal as od; +use std::collections::HashMap; +use std::future::Future; +use std::ops::Deref; +use std::str::FromStr; + +#[cxx::bridge(namespace = "opendal::ffi::async")] +mod ffi { + struct HashMapValue { + key: String, + value: String, + } + + // here we have to use raw pointers since: + // 1. cxx-async futures requires 'static lifetime (and it's hard to change for now) + // 2. cxx SharedPtr cannot accept Rust types as type parameters for now + pub struct OperatorPtr { + op: *const Operator, + } + + extern "Rust" { + type Operator; + + fn new_operator(scheme: &str, configs: Vec) -> Result>; + unsafe fn operator_read(op: OperatorPtr, path: String) -> RustFutureRead; + unsafe fn operator_write(op: OperatorPtr, path: String, bs: Vec) -> RustFutureWrite; + } + + extern "C++" { + type RustFutureRead = super::RustFutureRead; + type RustFutureWrite = super::RustFutureWrite; + } +} + +#[cxx_async::bridge(namespace = opendal::ffi::async)] +unsafe impl Future for RustFutureRead { + type Output = Vec; +} + +#[cxx_async::bridge(namespace = opendal::ffi::async)] +unsafe impl Future for RustFutureWrite { + type Output = (); +} + +pub struct Operator(od::Operator); + +fn new_operator(scheme: &str, configs: Vec) -> Result> { + let scheme = od::Scheme::from_str(scheme)?; + + let map: HashMap = configs + .into_iter() + .map(|value| (value.key, value.value)) + .collect(); + + let op = Box::new(Operator(od::Operator::via_iter(scheme, map)?)); + + Ok(op) +} + +impl Deref for ffi::OperatorPtr { + type Target = Operator; + + fn deref(&self) -> &Self::Target { + unsafe { &*self.op } + } +} + +unsafe impl Send for ffi::OperatorPtr {} + +unsafe fn operator_read(op: ffi::OperatorPtr, path: String) -> RustFutureRead { + RustFutureRead::fallible(async move { + Ok((*op) + .0 + .read(&path) + .await + .map_err(|e| CxxAsyncException::new(e.to_string().into_boxed_str()))? + .to_vec()) + }) +} + +unsafe fn operator_write(op: ffi::OperatorPtr, path: String, bs: Vec) -> RustFutureWrite { + RustFutureWrite::fallible(async move { + Ok((*op) + .0 + .write(&path, bs) + .await + .map_err(|e| CxxAsyncException::new(e.to_string().into_boxed_str()))?) + }) +} diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index a37807c078e6..957134501cc7 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#[cfg(feature = "async")] +mod r#async; mod lister; mod reader; mod types; diff --git a/bindings/cpp/src/opendal_async.cpp b/bindings/cpp/src/opendal_async.cpp new file mode 100644 index 000000000000..6ec2dccad9f2 --- /dev/null +++ b/bindings/cpp/src/opendal_async.cpp @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "opendal_async.hpp" + +#include + +#include "async.rs.h" +#include "async_defs.hpp" + +#define RUST_STR(s) rust::Str(s.data(), s.size()) +#define RUST_STRING(s) rust::String(s.data(), s.size()) + +using namespace opendal::async; + +static rust::Box new_operator( + std::string_view scheme, + const std::unordered_map &config) { + auto rust_map = rust::Vec(); + rust_map.reserve(config.size()); + for (auto &[k, v] : config) { + rust_map.push_back({RUST_STRING(k), RUST_STRING(v)}); + } + + return opendal::ffi::async::new_operator(RUST_STR(scheme), rust_map); +} + +Operator::Operator(std::string_view scheme, + const std::unordered_map &config) + : operator_(new_operator(scheme, config)) {} + +Operator::ReadFuture Operator::read(std::string_view path) { + return opendal::ffi::async::operator_read( + opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(path)); +} + +Operator::WriteFuture Operator::write(std::string_view path, + std::span data) { + rust::Vec vec; + std::copy(data.begin(), data.end(), std::back_inserter(vec)); + + return opendal::ffi::async::operator_write( + opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(path), vec); +} diff --git a/bindings/cpp/tests/async_test.cpp b/bindings/cpp/tests/async_test.cpp new file mode 100644 index 000000000000..dc25c20685f3 --- /dev/null +++ b/bindings/cpp/tests/async_test.cpp @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include "cppcoro/sync_wait.hpp" +#include "cppcoro/task.hpp" +#include "gtest/gtest.h" +#include "opendal_async.hpp" + +class AsyncOpendalTest : public ::testing::Test { + protected: + std::optional op; + + std::string scheme; + std::unordered_map config; + + // random number generator + std::mt19937 rng; + + void SetUp() override { + scheme = "memory"; + rng.seed(time(nullptr)); + + op = opendal::async::Operator(scheme, config); + } +}; + +TEST_F(AsyncOpendalTest, BasicTest) { + auto path = "test_path"; + std::vector data{1, 2, 3, 4, 5}; + cppcoro::sync_wait(op->write(path, data)); + auto res = cppcoro::sync_wait(op->read(path)); + for (size_t i = 0; i < data.size(); ++i) EXPECT_EQ(data[i], res[i]); + + path = "test_path2"; + cppcoro::sync_wait([&]() -> cppcoro::task { + co_await op->write(path, data); + auto res = co_await op->read(path); + for (size_t i = 0; i < data.size(); ++i) EXPECT_EQ(data[i], res[i]); + co_return; + }()); +}