Skip to content

Commit

Permalink
Prewarm the gandiva cache during loading time. Make the cache size co…
Browse files Browse the repository at this point in the history
…nfigurable
  • Loading branch information
projjal committed Aug 5, 2020
1 parent 5598bc6 commit c93f763
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 5 deletions.
2 changes: 1 addition & 1 deletion cpp/cmake_modules/ThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ else()
BOOST_SOURCE_URL
# These are trimmed boost bundles we maintain.
# See cpp/build_support/trim-boost.sh
"https://dl.bintray.com/ursalabs/arrow-boost/boost_${ARROW_BOOST_BUILD_VERSION_UNDERSCORES}.tar.gz"
"https://dl.bintray.com/boostorg/release/${ARROW_BOOST_BUILD_VERSION}/source/boost_${ARROW_BOOST_BUILD_VERSION_UNDERSCORES}.tar.gz"
"https://dl.bintray.com/ursalabs/arrow-boost/boost_${ARROW_BOOST_BUILD_VERSION_UNDERSCORES}.tar.gz"
"https://github.com/boostorg/boost/archive/boost-${ARROW_BOOST_BUILD_VERSION}.tar.gz"
# FIXME(ARROW-6407) automate uploading this archive to ensure it reflects
# our currently used packages and doesn't fall out of sync with
Expand Down
27 changes: 25 additions & 2 deletions cpp/src/gandiva/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <cstdlib>
#include <iostream>
#include <mutex>

#include "gandiva/lru_cache.h"
Expand All @@ -26,7 +28,12 @@ namespace gandiva {
template <class KeyType, typename ValueType>
class Cache {
public:
explicit Cache(size_t capacity = CACHE_SIZE) : cache_(capacity) {}
explicit Cache(size_t capacity) : cache_(capacity) {
std::cout << "Creating gandiva cache with capacity: " << capacity << std::endl;
}

Cache() : Cache(GetCapacity()) {}

ValueType GetModule(KeyType cache_key) {
arrow::util::optional<ValueType> result;
mtx_.lock();
Expand All @@ -42,8 +49,24 @@ class Cache {
}

private:
static int GetCapacity() {
int capacity;
const char* env_cache_size = std::getenv("GANDIVA_CACHE_SIZE");
if (env_cache_size != nullptr) {
capacity = std::atoi(env_cache_size);
if (capacity <= 0) {
std::cout << "Invalid cache size provided. Using default cache size."
<< std::endl;
capacity = DEFAULT_CACHE_SIZE;
}
} else {
capacity = DEFAULT_CACHE_SIZE;
}
return capacity;
}

LruCache<KeyType, ValueType> cache_;
static const int CACHE_SIZE = 250;
static const int DEFAULT_CACHE_SIZE = 500;
std::mutex mtx_;
};
} // namespace gandiva
2 changes: 1 addition & 1 deletion cpp/src/gandiva/jni/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ set(PROTO_HDRS "${PROTO_OUTPUT_DIR}/Types.pb.h")
set(JNI_HEADERS_DIR "${CMAKE_CURRENT_BINARY_DIR}/java")
add_subdirectory(../../../../java/gandiva ./java/gandiva)

set(GANDIVA_LINK_LIBS ${ARROW_PROTOBUF_LIBPROTOBUF})
set(GANDIVA_LINK_LIBS ${ARROW_PROTOBUF_LIBPROTOBUF} ${BOOST_FILESYSTEM_LIBRARY})
if(ARROW_BUILD_STATIC)
list(APPEND GANDIVA_LINK_LIBS gandiva_static)
else()
Expand Down
189 changes: 188 additions & 1 deletion cpp/src/gandiva/jni/jni_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include <google/protobuf/io/coded_stream.h>

#include <fstream>
#include <iostream>
#include <map>
#include <memory>
#include <mutex>
Expand All @@ -29,6 +31,11 @@
#include <arrow/record_batch.h>
#include <arrow/type.h>

#include <boost/filesystem.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>

#include "Types.pb.h"
#include "gandiva/configuration.h"
#include "gandiva/filter.h"
Expand Down Expand Up @@ -62,11 +69,15 @@ using gandiva::ConfigurationBuilder;
using gandiva::FilterHolder;
using gandiva::ProjectorHolder;

namespace fs = boost::filesystem;

// forward declarations
NodePtr ProtoTypeToNode(const types::TreeNode& node);

static jint JNI_VERSION = JNI_VERSION_1_6;

static const char* kEnvPrewarmCacheDir = "GANDIVA_PREWARM_CACHE_DIR";

// extern refs - initialized for other modules.
jclass configuration_builder_class_;

Expand All @@ -82,6 +93,8 @@ static jfieldID vector_expander_ret_capacity_;
gandiva::IdToModuleMap<std::shared_ptr<ProjectorHolder>> projector_modules_;
gandiva::IdToModuleMap<std::shared_ptr<FilterHolder>> filter_modules_;

void PrewarmCache();

jint JNI_OnLoad(JavaVM* vm, void* reserved) {
JNIEnv* env;
if (vm->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION) != JNI_OK) {
Expand Down Expand Up @@ -117,6 +130,9 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
env->GetFieldID(vector_expander_ret_class_, "address", "J");
vector_expander_ret_capacity_ =
env->GetFieldID(vector_expander_ret_class_, "capacity", "J");

PrewarmCache();

return JNI_VERSION;
}

Expand Down Expand Up @@ -572,6 +588,171 @@ void releaseProjectorInput(jbyteArray schema_arr, jbyte* schema_bytes,
env->ReleaseByteArrayElements(exprs_arr, exprs_bytes, JNI_ABORT);
}

void PrewarmCache() {
try {
auto prewarm_cache_dir = std::getenv(kEnvPrewarmCacheDir);
if (prewarm_cache_dir == nullptr) {
std::cout << "[Gandiva Cache Prewarm] No cache directory env variable is set. Skipping prewarming" << std::endl;
return;
}

fs::path path(prewarm_cache_dir);
std::cout << path.string() << "\n";
if (!fs::is_directory(path)) {
std::cerr << "[Gandiva Cache Prewarm] Prewarm cache dir env variable set is not a directory" << std::endl;
return;
}

fs::directory_iterator end_iter;
for (fs::directory_iterator dir_iter(path); dir_iter != end_iter; ++dir_iter) {
try {
if (fs::is_regular_file(dir_iter->status())) {
std::ifstream fin;
fin.open(dir_iter->path().string(), std::ios::binary);
if (!fin.is_open()) {
std::cerr << "[Gandiva Cache Prewarm] Failure opening warmup cache file" << std::endl;
continue;
}

fin.seekg(0, std::ios::end);
size_t remaining = fin.tellg();
fin.seekg(0, std::ios::beg);


int32_t schema_len;
if (remaining < sizeof(schema_len)) {
std::cerr << "[Gandiva Cache Prewarm] Invalid file." << std::endl;
continue;
}
fin.read(reinterpret_cast<char*>(&schema_len), sizeof schema_len);
remaining -= sizeof(schema_len);

if (remaining < (size_t) schema_len) {
std::cerr << "[Gandiva Cache Prewarm] Invalid file." << std::endl;
continue;
}
std::vector<char> schema_vec(schema_len);
fin.read(schema_vec.data(), schema_len);
remaining -= (size_t) schema_len;

int32_t exprs_len;
if (remaining < sizeof(exprs_len)) {
std::cerr << "[Gandiva Cache Prewarm] Invalid file." << std::endl;
continue;
}
fin.read(reinterpret_cast<char*>(&exprs_len), sizeof exprs_len);
remaining -= sizeof(exprs_len);

if (remaining != (size_t) exprs_len) {
std::cerr << "[Gandiva Cache Prewarm] Invalid file." << std::endl;
continue;
}
std::vector<char> exprs_vec(exprs_len);
fin.read(exprs_vec.data(), exprs_len);

fin.close();

uint8_t* schema_bytes = reinterpret_cast<uint8_t*>(schema_vec.data());
uint8_t* exprs_bytes = reinterpret_cast<uint8_t*>(exprs_vec.data());

std::shared_ptr<Projector> projector;
types::Schema schema;
types::ExpressionList exprs;
ExpressionVector expr_vector;
SchemaPtr schema_ptr;
FieldVector ret_types;
gandiva::Status status;
auto mode = gandiva::SelectionVector::MODE_NONE;

ConfigurationBuilder configuration_builder;
std::shared_ptr<Configuration> config = configuration_builder.build();

if (!ParseProtobuf(schema_bytes, schema_len, &schema)) {
std::cerr << "[Gandiva Cache Prewarm] Failed to parse protobuf for schema" << std::endl;
continue;
}

if (!ParseProtobuf(exprs_bytes, exprs_len, &exprs)) {
std::cerr << "[Gandiva Cache Prewarm] Failed to parse protobuf for expr list" << std::endl;
continue;
}

// convert types::Schema to arrow::Schema
schema_ptr = ProtoTypeToSchema(schema);
if (schema_ptr == nullptr) {
std::cerr << "[Gandiva Cache Prewarm] Failed to convert protobuf schema to arrow type" << std::endl;
continue;
}

// create Expression out of the list of exprs
for (int i = 0; i < exprs.exprs_size(); i++) {
ExpressionPtr root = ProtoTypeToExpression(exprs.exprs(i));

if (root == nullptr) {
std::cerr << "[Gandiva Cache Prewarm] Failed to convert protobuf expr to arrow type" << std::endl;
continue;
}

expr_vector.push_back(root);
ret_types.push_back(root->result());
}

status = Projector::Make(schema_ptr, expr_vector, mode, config, &projector);

if (!status.ok()) {
std::cerr << "[Gandiva Cache Prewarm] Failed to create a projector module";
continue;
}
std::cout << "[Gandiva Cache Prewarm] Built and cached a projector from the expression and schema in the file" << std::endl;
}
} catch (const std::exception& ex) {
std::cerr << "[Gandiva Cache Prewarm] " << dir_iter->path().filename() << " " << ex.what() << std::endl;
}
}

} catch (const std::exception& ex) {
std::cerr << "[Gandiva Cache Prewarm] Failed prewarming the cache: " << ex.what() << std::endl;
}
}

void CacheExpressionAndSchemaForPrewarmOnStartup(char* schema_bytes, int32_t schema_len, char* exprs_bytes, int32_t exprs_len) {
try {
auto env_path = std::getenv(kEnvPrewarmCacheDir);
if (env_path == nullptr) {
return;
}

fs::path path(env_path);
if (!fs::is_directory(path) && !fs::create_directories(path)) {
std::cerr << "[CacheExpression] Failed to create directory to save the schema and expressions" << std::endl;
return;
}

boost::uuids::uuid uuid = boost::uuids::random_generator()();

path /= boost::uuids::to_string(uuid);

std::ofstream fout;
fout.open(path.string(), std::ios::binary | std::ios::out);
if (!fout.is_open()) {
std::cerr << "[CacheExpression] Failed to open file to write the schema and expression" << std::endl;
return;
}

fout.write(reinterpret_cast<char*>(&schema_len), sizeof(schema_len));
fout.write(schema_bytes, schema_len);

fout.write(reinterpret_cast<char*>(&exprs_len), sizeof(exprs_len));
fout.write(exprs_bytes, exprs_len);

fout.close();

std::cout << "[CacheExpression] Cached schema and expression bytes to a file" << std::endl;
} catch (const std::exception& ex) {
std::cerr << "[CacheExpression] Failed to cache the expression to file " << ex.what() << std::endl;
}
}

JNIEXPORT jlong JNICALL Java_org_apache_arrow_gandiva_evaluator_JniWrapper_buildProjector(
JNIEnv* env, jobject obj, jbyteArray schema_arr, jbyteArray exprs_arr,
jint selection_vector_type, jlong configuration_id) {
Expand Down Expand Up @@ -648,7 +829,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_gandiva_evaluator_JniWrapper_build
break;
}
// good to invoke the evaluator now
status = Projector::Make(schema_ptr, expr_vector, mode, config, &projector);
bool cache_hit;
status = Projector::Make(schema_ptr, expr_vector, mode, config, &projector, &cache_hit);

if (!status.ok()) {
ss << "Failed to make LLVM module due to " << status.message() << "\n";
Expand All @@ -660,6 +842,11 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_gandiva_evaluator_JniWrapper_build
holder = std::shared_ptr<ProjectorHolder>(
new ProjectorHolder(schema_ptr, ret_types, std::move(projector)));
module_id = projector_modules_.Insert(holder);

if (!cache_hit) {
CacheExpressionAndSchemaForPrewarmOnStartup(reinterpret_cast<char*>(schema_bytes), schema_len, reinterpret_cast<char*>(exprs_bytes), exprs_len);
}

releaseProjectorInput(schema_arr, schema_bytes, exprs_arr, exprs_bytes, env);
return module_id;

Expand Down
2 changes: 2 additions & 0 deletions cpp/src/gandiva/lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <list>
#include <iostream>
#include <unordered_map>
#include <utility>

Expand Down Expand Up @@ -107,6 +108,7 @@ class LruCache {

private:
void evict() {
std::cout << "Evicted a cache item from the lru cache" << std::endl;
// evict item from the end of most recently used list
typename list_type::iterator i = --lru_list_.end();
map_.erase(*i);
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/gandiva/projector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs,
SelectionVector::Mode selection_vector_mode,
std::shared_ptr<Configuration> configuration,
std::shared_ptr<Projector>* projector) {
return Projector::Make(schema, exprs, selection_vector_mode, configuration,
projector, nullptr);
}

Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs,
SelectionVector::Mode selection_vector_mode,
std::shared_ptr<Configuration> configuration,
std::shared_ptr<Projector>* projector, bool* cache_hit) {
ARROW_RETURN_IF(schema == nullptr, Status::Invalid("Schema cannot be null"));
ARROW_RETURN_IF(exprs.empty(), Status::Invalid("Expressions cannot be empty"));
ARROW_RETURN_IF(configuration == nullptr,
Expand All @@ -159,6 +167,9 @@ Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs,
std::shared_ptr<Projector> cached_projector = cache.GetModule(cache_key);
if (cached_projector != nullptr) {
*projector = cached_projector;
if (cache_hit != nullptr) {
*cache_hit = true;
}
return Status::OK();
}

Expand Down Expand Up @@ -187,6 +198,9 @@ Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs,
*projector = std::shared_ptr<Projector>(
new Projector(std::move(llvm_gen), schema, output_fields, configuration));
cache.PutModule(cache_key, *projector);
if (cache_hit != nullptr) {
*cache_hit = false;
}

return Status::OK();
}
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/gandiva/projector.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ class GANDIVA_EXPORT Projector {
std::shared_ptr<Configuration> configuration,
std::shared_ptr<Projector>* projector);

/// Build a projector for the given schema to evaluate the vector of expressions.
/// Customize the projector with runtime configuration.
///
/// \param[in] schema schema for the record batches, and the expressions.
/// \param[in] exprs vector of expressions.
/// \param[in] selection_vector_mode mode of selection vector
/// \param[in] configuration run time configuration.
/// \param[out] projector the returned projector object
/// \param[out] cache_hit returns true if there was a cache hit
static Status Make(SchemaPtr schema, const ExpressionVector& exprs,
SelectionVector::Mode selection_vector_mode,
std::shared_ptr<Configuration> configuration,
std::shared_ptr<Projector>* projector, bool* cache_hit);

/// Evaluate the specified record batch, and return the allocated and populated output
/// arrays. The output arrays will be allocated from the memory pool 'pool', and added
/// to the vector 'output'.
Expand Down

0 comments on commit c93f763

Please sign in to comment.