diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake index 9a842332f942a..3f74c9d936b0f 100644 --- a/cpp/cmake_modules/ThirdpartyToolchain.cmake +++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake @@ -276,8 +276,8 @@ else() BOOST_SOURCE_URL # These are trimmed boost bundles we maintain. # See cpp/build_support/trim-boost.sh - "https://dl.bintray.com/ursalabs/arrow-boost/boost_${ARROW_BOOST_BUILD_VERSION_UNDERSCORES}.tar.gz" "https://dl.bintray.com/boostorg/release/${ARROW_BOOST_BUILD_VERSION}/source/boost_${ARROW_BOOST_BUILD_VERSION_UNDERSCORES}.tar.gz" + "https://dl.bintray.com/ursalabs/arrow-boost/boost_${ARROW_BOOST_BUILD_VERSION_UNDERSCORES}.tar.gz" "https://github.com/boostorg/boost/archive/boost-${ARROW_BOOST_BUILD_VERSION}.tar.gz" # FIXME(ARROW-6407) automate uploading this archive to ensure it reflects # our currently used packages and doesn't fall out of sync with diff --git a/cpp/src/gandiva/cache.h b/cpp/src/gandiva/cache.h index 83ffa9b9a64aa..5d2647b35f859 100644 --- a/cpp/src/gandiva/cache.h +++ b/cpp/src/gandiva/cache.h @@ -17,6 +17,8 @@ #pragma once +#include +#include #include #include "gandiva/lru_cache.h" @@ -26,7 +28,12 @@ namespace gandiva { template class Cache { public: - explicit Cache(size_t capacity = CACHE_SIZE) : cache_(capacity) {} + explicit Cache(size_t capacity) : cache_(capacity) { + std::cout << "Creating gandiva cache with capacity: " << capacity << std::endl; + } + + Cache() : Cache(GetCapacity()) {} + ValueType GetModule(KeyType cache_key) { arrow::util::optional result; mtx_.lock(); @@ -42,8 +49,24 @@ class Cache { } private: + static int GetCapacity() { + int capacity; + const char* env_cache_size = std::getenv("GANDIVA_CACHE_SIZE"); + if (env_cache_size != nullptr) { + capacity = std::atoi(env_cache_size); + if (capacity <= 0) { + std::cout << "Invalid cache size provided. Using default cache size." + << std::endl; + capacity = DEFAULT_CACHE_SIZE; + } + } else { + capacity = DEFAULT_CACHE_SIZE; + } + return capacity; + } + LruCache cache_; - static const int CACHE_SIZE = 250; + static const int DEFAULT_CACHE_SIZE = 500; std::mutex mtx_; }; } // namespace gandiva diff --git a/cpp/src/gandiva/jni/CMakeLists.txt b/cpp/src/gandiva/jni/CMakeLists.txt index 24ceb63559176..aa9c53a1c3334 100644 --- a/cpp/src/gandiva/jni/CMakeLists.txt +++ b/cpp/src/gandiva/jni/CMakeLists.txt @@ -54,7 +54,7 @@ set(PROTO_HDRS "${PROTO_OUTPUT_DIR}/Types.pb.h") set(JNI_HEADERS_DIR "${CMAKE_CURRENT_BINARY_DIR}/java") add_subdirectory(../../../../java/gandiva ./java/gandiva) -set(GANDIVA_LINK_LIBS ${ARROW_PROTOBUF_LIBPROTOBUF}) +set(GANDIVA_LINK_LIBS ${ARROW_PROTOBUF_LIBPROTOBUF} ${BOOST_FILESYSTEM_LIBRARY}) if(ARROW_BUILD_STATIC) list(APPEND GANDIVA_LINK_LIBS gandiva_static) else() diff --git a/cpp/src/gandiva/jni/jni_common.cc b/cpp/src/gandiva/jni/jni_common.cc index e09daf6a48f1a..5b301e62d382e 100644 --- a/cpp/src/gandiva/jni/jni_common.cc +++ b/cpp/src/gandiva/jni/jni_common.cc @@ -15,8 +15,17 @@ // specific language governing permissions and limitations // under the License. +#include +#include +#include #include +#include +#include +#include +#include +#include +#include #include #include #include @@ -25,10 +34,6 @@ #include #include -#include -#include -#include - #include "Types.pb.h" #include "gandiva/configuration.h" #include "gandiva/filter.h" @@ -62,11 +67,15 @@ using gandiva::ConfigurationBuilder; using gandiva::FilterHolder; using gandiva::ProjectorHolder; +namespace fs = boost::filesystem; + // forward declarations NodePtr ProtoTypeToNode(const types::TreeNode& node); static jint JNI_VERSION = JNI_VERSION_1_6; +static const char* kEnvPrewarmCacheDir = "GANDIVA_PREWARM_CACHE_DIR"; + // extern refs - initialized for other modules. jclass configuration_builder_class_; @@ -82,6 +91,8 @@ static jfieldID vector_expander_ret_capacity_; gandiva::IdToModuleMap> projector_modules_; gandiva::IdToModuleMap> filter_modules_; +void PrewarmCache(); + jint JNI_OnLoad(JavaVM* vm, void* reserved) { JNIEnv* env; if (vm->GetEnv(reinterpret_cast(&env), JNI_VERSION) != JNI_OK) { @@ -117,6 +128,9 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { env->GetFieldID(vector_expander_ret_class_, "address", "J"); vector_expander_ret_capacity_ = env->GetFieldID(vector_expander_ret_class_, "capacity", "J"); + + PrewarmCache(); + return JNI_VERSION; } @@ -572,6 +586,192 @@ void releaseProjectorInput(jbyteArray schema_arr, jbyte* schema_bytes, env->ReleaseByteArrayElements(exprs_arr, exprs_bytes, JNI_ABORT); } +void PrewarmCache() { + try { + auto prewarm_cache_dir = std::getenv(kEnvPrewarmCacheDir); + if (prewarm_cache_dir == nullptr) { + std::cout << "[Gandiva Cache Prewarm] No cache directory env variable is set. " + "Skipping prewarming" + << std::endl; + return; + } + + fs::path path(prewarm_cache_dir); + std::cout << path.string() << "\n"; + if (!fs::is_directory(path)) { + std::cerr << "[Gandiva Cache Prewarm] Prewarm cache dir env variable set is not a " + "directory" + << std::endl; + return; + } + + fs::directory_iterator end_iter; + for (fs::directory_iterator dir_iter(path); dir_iter != end_iter; ++dir_iter) { + try { + if (fs::is_regular_file(dir_iter->status())) { + std::ifstream fin; + fin.open(dir_iter->path().string(), std::ios::binary); + if (!fin.is_open()) { + std::cerr << "[Gandiva Cache Prewarm] Failure opening warmup cache file" + << std::endl; + continue; + } + + fin.seekg(0, std::ios::end); + size_t remaining = fin.tellg(); + fin.seekg(0, std::ios::beg); + + int32_t schema_len; + if (remaining < sizeof(schema_len)) { + std::cerr << "[Gandiva Cache Prewarm] Invalid file." << std::endl; + continue; + } + fin.read(reinterpret_cast(&schema_len), sizeof schema_len); + remaining -= sizeof(schema_len); + + if (remaining < (size_t)schema_len) { + std::cerr << "[Gandiva Cache Prewarm] Invalid file." << std::endl; + continue; + } + std::vector schema_vec(schema_len); + fin.read(schema_vec.data(), schema_len); + remaining -= (size_t)schema_len; + + int32_t exprs_len; + if (remaining < sizeof(exprs_len)) { + std::cerr << "[Gandiva Cache Prewarm] Invalid file." << std::endl; + continue; + } + fin.read(reinterpret_cast(&exprs_len), sizeof exprs_len); + remaining -= sizeof(exprs_len); + + if (remaining != (size_t)exprs_len) { + std::cerr << "[Gandiva Cache Prewarm] Invalid file." << std::endl; + continue; + } + std::vector exprs_vec(exprs_len); + fin.read(exprs_vec.data(), exprs_len); + + fin.close(); + + uint8_t* schema_bytes = reinterpret_cast(schema_vec.data()); + uint8_t* exprs_bytes = reinterpret_cast(exprs_vec.data()); + + std::shared_ptr projector; + types::Schema schema; + types::ExpressionList exprs; + ExpressionVector expr_vector; + SchemaPtr schema_ptr; + FieldVector ret_types; + gandiva::Status status; + auto mode = gandiva::SelectionVector::MODE_NONE; + + ConfigurationBuilder configuration_builder; + std::shared_ptr config = configuration_builder.build(); + + if (!ParseProtobuf(schema_bytes, schema_len, &schema)) { + std::cerr << "[Gandiva Cache Prewarm] Failed to parse protobuf for schema" + << std::endl; + continue; + } + + if (!ParseProtobuf(exprs_bytes, exprs_len, &exprs)) { + std::cerr << "[Gandiva Cache Prewarm] Failed to parse protobuf for expr list" + << std::endl; + continue; + } + + // convert types::Schema to arrow::Schema + schema_ptr = ProtoTypeToSchema(schema); + if (schema_ptr == nullptr) { + std::cerr << "[Gandiva Cache Prewarm] Failed to convert protobuf schema to " + "arrow type" + << std::endl; + continue; + } + + // create Expression out of the list of exprs + for (int i = 0; i < exprs.exprs_size(); i++) { + ExpressionPtr root = ProtoTypeToExpression(exprs.exprs(i)); + + if (root == nullptr) { + std::cerr << "[Gandiva Cache Prewarm] Failed to convert protobuf expr to " + "arrow type" + << std::endl; + continue; + } + + expr_vector.push_back(root); + ret_types.push_back(root->result()); + } + + status = Projector::Make(schema_ptr, expr_vector, mode, config, &projector); + + if (!status.ok()) { + std::cerr << "[Gandiva Cache Prewarm] Failed to create a projector module"; + continue; + } + std::cout << "[Gandiva Cache Prewarm] Built and cached a projector from the " + "expression and schema in the file" + << std::endl; + } + } catch (const std::exception& ex) { + std::cerr << "[Gandiva Cache Prewarm] " << dir_iter->path().filename() << " " + << ex.what() << std::endl; + } + } + + } catch (const std::exception& ex) { + std::cerr << "[Gandiva Cache Prewarm] Failed prewarming the cache: " << ex.what() + << std::endl; + } +} + +void CacheExpressionAndSchemaForPrewarmOnStartup(char* schema_bytes, int32_t schema_len, + char* exprs_bytes, int32_t exprs_len) { + try { + auto env_path = std::getenv(kEnvPrewarmCacheDir); + if (env_path == nullptr) { + return; + } + + fs::path path(env_path); + if (!fs::is_directory(path) && !fs::create_directories(path)) { + std::cerr << "[CacheExpression] Failed to create directory to save the schema and " + "expressions" + << std::endl; + return; + } + + boost::uuids::uuid uuid = boost::uuids::random_generator()(); + + path /= boost::uuids::to_string(uuid); + + std::ofstream fout; + fout.open(path.string(), std::ios::binary | std::ios::out); + if (!fout.is_open()) { + std::cerr + << "[CacheExpression] Failed to open file to write the schema and expression" + << std::endl; + return; + } + + fout.write(reinterpret_cast(&schema_len), sizeof(schema_len)); + fout.write(schema_bytes, schema_len); + + fout.write(reinterpret_cast(&exprs_len), sizeof(exprs_len)); + fout.write(exprs_bytes, exprs_len); + + fout.close(); + + std::cout << "[CacheExpression] Cached schema and expression bytes to a file" + << std::endl; + } catch (const std::exception& ex) { + std::cerr << "[CacheExpression] Failed to cache the expression to file " << ex.what() + << std::endl; + } +} + JNIEXPORT jlong JNICALL Java_org_apache_arrow_gandiva_evaluator_JniWrapper_buildProjector( JNIEnv* env, jobject obj, jbyteArray schema_arr, jbyteArray exprs_arr, jint selection_vector_type, jlong configuration_id) { @@ -648,7 +848,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_gandiva_evaluator_JniWrapper_build break; } // good to invoke the evaluator now - status = Projector::Make(schema_ptr, expr_vector, mode, config, &projector); + bool cache_hit; + status = Projector::Make(schema_ptr, expr_vector, mode, config, &projector, &cache_hit); if (!status.ok()) { ss << "Failed to make LLVM module due to " << status.message() << "\n"; @@ -660,6 +861,13 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_gandiva_evaluator_JniWrapper_build holder = std::shared_ptr( new ProjectorHolder(schema_ptr, ret_types, std::move(projector))); module_id = projector_modules_.Insert(holder); + + if (!cache_hit) { + CacheExpressionAndSchemaForPrewarmOnStartup( + reinterpret_cast(schema_bytes), schema_len, + reinterpret_cast(exprs_bytes), exprs_len); + } + releaseProjectorInput(schema_arr, schema_bytes, exprs_arr, exprs_bytes, env); return module_id; diff --git a/cpp/src/gandiva/lru_cache.h b/cpp/src/gandiva/lru_cache.h index 6602116b0a06b..cd2400f36c09e 100644 --- a/cpp/src/gandiva/lru_cache.h +++ b/cpp/src/gandiva/lru_cache.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include #include @@ -107,6 +108,7 @@ class LruCache { private: void evict() { + std::cout << "Evicted a cache item from the lru cache" << std::endl; // evict item from the end of most recently used list typename list_type::iterator i = --lru_list_.end(); map_.erase(*i); diff --git a/cpp/src/gandiva/projector.cc b/cpp/src/gandiva/projector.cc index 734720c64c9ad..682324e260c48 100644 --- a/cpp/src/gandiva/projector.cc +++ b/cpp/src/gandiva/projector.cc @@ -24,7 +24,6 @@ #include "arrow/util/hash_util.h" #include "arrow/util/logging.h" - #include "gandiva/cache.h" #include "gandiva/expr_validator.h" #include "gandiva/llvm_generator.h" @@ -148,6 +147,14 @@ Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs, SelectionVector::Mode selection_vector_mode, std::shared_ptr configuration, std::shared_ptr* projector) { + return Projector::Make(schema, exprs, selection_vector_mode, configuration, projector, + nullptr); +} + +Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs, + SelectionVector::Mode selection_vector_mode, + std::shared_ptr configuration, + std::shared_ptr* projector, bool* cache_hit) { ARROW_RETURN_IF(schema == nullptr, Status::Invalid("Schema cannot be null")); ARROW_RETURN_IF(exprs.empty(), Status::Invalid("Expressions cannot be empty")); ARROW_RETURN_IF(configuration == nullptr, @@ -159,6 +166,9 @@ Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs, std::shared_ptr cached_projector = cache.GetModule(cache_key); if (cached_projector != nullptr) { *projector = cached_projector; + if (cache_hit != nullptr) { + *cache_hit = true; + } return Status::OK(); } @@ -187,6 +197,9 @@ Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs, *projector = std::shared_ptr( new Projector(std::move(llvm_gen), schema, output_fields, configuration)); cache.PutModule(cache_key, *projector); + if (cache_hit != nullptr) { + *cache_hit = false; + } return Status::OK(); } diff --git a/cpp/src/gandiva/projector.h b/cpp/src/gandiva/projector.h index 20b36c9d883cd..657c454628390 100644 --- a/cpp/src/gandiva/projector.h +++ b/cpp/src/gandiva/projector.h @@ -23,7 +23,6 @@ #include #include "arrow/status.h" - #include "gandiva/arrow.h" #include "gandiva/configuration.h" #include "gandiva/expression.h" @@ -77,6 +76,20 @@ class GANDIVA_EXPORT Projector { std::shared_ptr configuration, std::shared_ptr* projector); + /// Build a projector for the given schema to evaluate the vector of expressions. + /// Customize the projector with runtime configuration. + /// + /// \param[in] schema schema for the record batches, and the expressions. + /// \param[in] exprs vector of expressions. + /// \param[in] selection_vector_mode mode of selection vector + /// \param[in] configuration run time configuration. + /// \param[out] projector the returned projector object + /// \param[out] cache_hit returns true if there was a cache hit + static Status Make(SchemaPtr schema, const ExpressionVector& exprs, + SelectionVector::Mode selection_vector_mode, + std::shared_ptr configuration, + std::shared_ptr* projector, bool* cache_hit); + /// Evaluate the specified record batch, and return the allocated and populated output /// arrays. The output arrays will be allocated from the memory pool 'pool', and added /// to the vector 'output'. diff --git a/dev/tasks/gandiva-jars/build-cpp-linux.sh b/dev/tasks/gandiva-jars/build-cpp-linux.sh index 61c82495af37a..685687aa6a40b 100755 --- a/dev/tasks/gandiva-jars/build-cpp-linux.sh +++ b/dev/tasks/gandiva-jars/build-cpp-linux.sh @@ -34,6 +34,7 @@ pushd "${ARROW_BUILD_DIR}" PATH="${CPYTHON_PATH}/bin:${PATH}" cmake -DCMAKE_BUILD_TYPE=Release \ + -DBOOST_SOURCE=BUNDLED \ -DARROW_DEPENDENCY_SOURCE="SYSTEM" \ -DZLIB_ROOT=/usr/local \ -DCMAKE_INSTALL_PREFIX=/arrow-dist \