Skip to content

Commit

Permalink
[GLUTEN-7418][VL] Add checks for allocation failures and initialize v…
Browse files Browse the repository at this point in the history
…ariables (apache#7419)

Closes apache#7418
  • Loading branch information
majetideepak authored Oct 9, 2024
1 parent a4d82d5 commit 411b317
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 33 deletions.
6 changes: 3 additions & 3 deletions cpp/core/jni/JniCommon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void gluten::JniCommonState::close() {
if (closed_) {
return;
}
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
env->DeleteGlobalRef(runtimeAwareClass_);
closed_ = true;
Expand Down Expand Up @@ -94,15 +94,15 @@ gluten::JniColumnarBatchIterator::JniColumnarBatchIterator(
}

gluten::JniColumnarBatchIterator::~JniColumnarBatchIterator() {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
env->DeleteGlobalRef(jColumnarBatchItr_);
env->DeleteGlobalRef(serializedColumnarBatchIteratorClass_);
vm_->DetachCurrentThread();
}

std::shared_ptr<gluten::ColumnarBatch> gluten::JniColumnarBatchIterator::next() {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
if (!env->CallBooleanMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorHasNext_)) {
checkException(env);
Expand Down
2 changes: 1 addition & 1 deletion cpp/core/jni/JniError.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void gluten::JniErrorState::close() {
if (closed_) {
return;
}
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
env->DeleteGlobalRef(glutenExceptionClass_);
env->DeleteGlobalRef(ioExceptionClass_);
Expand Down
14 changes: 13 additions & 1 deletion cpp/core/memory/MemoryAllocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,24 +107,36 @@ void ListenableMemoryAllocator::updateUsage(int64_t size) {

bool StdMemoryAllocator::allocate(int64_t size, void** out) {
*out = std::malloc(size);
if (*out == nullptr) {
return false;
}
bytes_ += size;
return true;
}

bool StdMemoryAllocator::allocateZeroFilled(int64_t nmemb, int64_t size, void** out) {
*out = std::calloc(nmemb, size);
if (*out == nullptr) {
return false;
}
bytes_ += size;
return true;
}

bool StdMemoryAllocator::allocateAligned(uint64_t alignment, int64_t size, void** out) {
*out = aligned_alloc(alignment, size);
if (*out == nullptr) {
return false;
}
bytes_ += size;
return true;
}

bool StdMemoryAllocator::reallocate(void* p, int64_t size, int64_t newSize, void** out) {
*out = std::realloc(p, newSize);
if (*out == nullptr) {
return false;
}
bytes_ += (newSize - size);
return true;
}
Expand All @@ -141,7 +153,7 @@ bool StdMemoryAllocator::reallocateAligned(void* p, uint64_t alignment, int64_t
}
}
void* reallocatedP = std::aligned_alloc(alignment, newSize);
if (!reallocatedP) {
if (reallocatedP == nullptr) {
return false;
}
memcpy(reallocatedP, p, std::min(size, newSize));
Expand Down
52 changes: 26 additions & 26 deletions cpp/velox/jni/JniFileSystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ std::string_view removePathSchema(std::string_view path) {
class JniReadFile : public facebook::velox::ReadFile {
public:
explicit JniReadFile(jobject obj) {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
obj_ = env->NewGlobalRef(obj);
checkException(env);
Expand All @@ -75,7 +75,7 @@ class JniReadFile : public facebook::velox::ReadFile {
~JniReadFile() override {
try {
close0();
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
env->DeleteGlobalRef(obj_);
checkException(env);
Expand All @@ -85,7 +85,7 @@ class JniReadFile : public facebook::velox::ReadFile {
}

std::string_view pread(uint64_t offset, uint64_t length, void* buf) const override {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
env->CallVoidMethod(
obj_, jniReadFilePread, static_cast<jlong>(offset), static_cast<jlong>(length), reinterpret_cast<jlong>(buf));
Expand All @@ -94,23 +94,23 @@ class JniReadFile : public facebook::velox::ReadFile {
}

bool shouldCoalesce() const override {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
jboolean out = env->CallBooleanMethod(obj_, jniReadFileShouldCoalesce);
checkException(env);
return out;
}

uint64_t size() const override {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
jlong out = env->CallLongMethod(obj_, jniReadFileSize);
checkException(env);
return static_cast<uint64_t>(out);
}

uint64_t memoryUsage() const override {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
jlong out = env->CallLongMethod(obj_, jniReadFileMemoryUsage);
checkException(env);
Expand All @@ -122,7 +122,7 @@ class JniReadFile : public facebook::velox::ReadFile {
}

uint64_t getNaturalReadSize() const override {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
jlong out = env->CallLongMethod(obj_, jniReadFileGetNaturalReadSize);
checkException(env);
Expand All @@ -131,7 +131,7 @@ class JniReadFile : public facebook::velox::ReadFile {

private:
void close0() {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
env->CallVoidMethod(obj_, jniReadFileClose);
checkException(env);
Expand All @@ -143,7 +143,7 @@ class JniReadFile : public facebook::velox::ReadFile {
class JniWriteFile : public facebook::velox::WriteFile {
public:
explicit JniWriteFile(jobject obj) {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
obj_ = env->NewGlobalRef(obj);
checkException(env);
Expand All @@ -152,7 +152,7 @@ class JniWriteFile : public facebook::velox::WriteFile {
~JniWriteFile() override {
try {
close0();
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
env->DeleteGlobalRef(obj_);
checkException(env);
Expand All @@ -162,7 +162,7 @@ class JniWriteFile : public facebook::velox::WriteFile {
}

void append(std::string_view data) override {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
const void* bytes = data.data();
unsigned long len = data.size();
Expand All @@ -171,7 +171,7 @@ class JniWriteFile : public facebook::velox::WriteFile {
}

void flush() override {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
env->CallVoidMethod(obj_, jniWriteFileFlush);
checkException(env);
Expand All @@ -182,7 +182,7 @@ class JniWriteFile : public facebook::velox::WriteFile {
}

uint64_t size() const override {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
jlong out = env->CallLongMethod(obj_, jniWriteFileSize);
checkException(env);
Expand All @@ -191,7 +191,7 @@ class JniWriteFile : public facebook::velox::WriteFile {

private:
void close0() {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
env->CallVoidMethod(obj_, jniWriteFileClose);
checkException(env);
Expand Down Expand Up @@ -263,15 +263,15 @@ class JniFileSystem : public facebook::velox::filesystems::FileSystem {
public:
explicit JniFileSystem(jobject obj, std::shared_ptr<const facebook::velox::config::ConfigBase> config)
: FileSystem(config) {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
obj_ = env->NewGlobalRef(obj);
checkException(env);
}

~JniFileSystem() override {
try {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
env->DeleteGlobalRef(obj_);
checkException(env);
Expand All @@ -290,7 +290,7 @@ class JniFileSystem : public facebook::velox::filesystems::FileSystem {
GLUTEN_CHECK(
options.values.empty(),
"JniFileSystem::openFileForRead: file options is not empty, this is not currently supported");
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
jobject obj = env->CallObjectMethod(obj_, jniFileSystemOpenFileForRead, createJString(env, path));
checkException(env);
Expand All @@ -304,7 +304,7 @@ class JniFileSystem : public facebook::velox::filesystems::FileSystem {
GLUTEN_CHECK(
options.values.empty(),
"JniFileSystem::openFileForWrite: file options is not empty, this is not currently supported");
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
jobject obj = env->CallObjectMethod(obj_, jniFileSystemOpenFileForWrite, createJString(env, path));
checkException(env);
Expand All @@ -313,29 +313,29 @@ class JniFileSystem : public facebook::velox::filesystems::FileSystem {
}

void remove(std::string_view path) override {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
env->CallVoidMethod(obj_, jniFileSystemRemove, createJString(env, path));
checkException(env);
}

void rename(std::string_view oldPath, std::string_view newPath, bool overwrite) override {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
env->CallVoidMethod(obj_, jniFileSystemRename, createJString(env, oldPath), createJString(env, newPath), overwrite);
checkException(env);
}

bool exists(std::string_view path) override {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
bool out = env->CallBooleanMethod(obj_, jniFileSystemExists, createJString(env, path));
checkException(env);
return out;
}

std::vector<std::string> list(std::string_view path) override {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
std::vector<std::string> out;
jobjectArray jarray = (jobjectArray)env->CallObjectMethod(obj_, jniFileSystemList, createJString(env, path));
Expand All @@ -350,21 +350,21 @@ class JniFileSystem : public facebook::velox::filesystems::FileSystem {
}

void mkdir(std::string_view path) override {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
env->CallVoidMethod(obj_, jniFileSystemMkdir, createJString(env, path));
checkException(env);
}

void rmdir(std::string_view path) override {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
env->CallVoidMethod(obj_, jniFileSystemRmdir, createJString(env, path));
checkException(env);
}

static bool isCapableForNewFile(uint64_t size) {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
bool out = env->CallStaticBooleanMethod(jniFileSystemClass, jniIsCapableForNewFile, static_cast<jlong>(size));
checkException(env);
Expand All @@ -379,7 +379,7 @@ class JniFileSystem : public facebook::velox::filesystems::FileSystem {
std::shared_ptr<FileSystem>(std::shared_ptr<const facebook::velox::config::ConfigBase>, std::string_view)>
fileSystemGenerator() {
return [](std::shared_ptr<const facebook::velox::config::ConfigBase> properties, std::string_view filePath) {
JNIEnv* env;
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
jobject obj = env->CallStaticObjectMethod(jniFileSystemClass, jniGetFileSystem);
checkException(env);
Expand Down
4 changes: 2 additions & 2 deletions cpp/velox/shuffle/VeloxShuffleReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ VeloxHashShuffleReaderDeserializer::VeloxHashShuffleReaderDeserializer(

std::shared_ptr<ColumnarBatch> VeloxHashShuffleReaderDeserializer::next() {
if (hasComplexType_) {
uint32_t numRows;
uint32_t numRows = 0;
GLUTEN_ASSIGN_OR_THROW(
auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, memoryPool_, numRows, decompressTime_));
if (arrowBuffers.empty()) {
Expand Down Expand Up @@ -399,7 +399,7 @@ std::shared_ptr<ColumnarBatch> VeloxSortShuffleReaderDeserializer::next() {
}

while (cachedRows_ < batchSize_) {
uint32_t numRows;
uint32_t numRows = 0;
GLUTEN_ASSIGN_OR_THROW(
auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, arrowPool_, numRows, decompressTime_));

Expand Down
6 changes: 6 additions & 0 deletions cpp/velox/udf/UdfLoader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ std::unordered_set<std::shared_ptr<UdfLoader::UdfSignature>> UdfLoader::getRegis
int numUdf = getNumUdf();
// allocate
UdfEntry* udfEntries = static_cast<UdfEntry*>(malloc(sizeof(UdfEntry) * numUdf));
if (udfEntries == nullptr) {
throw gluten::GlutenException("malloc failed");
}

void* getUdfEntriesSym = loadSymFromLibrary(handle, libPath, GLUTEN_TOSTRING(GLUTEN_GET_UDF_ENTRIES));
auto getUdfEntries = reinterpret_cast<void (*)(UdfEntry*)>(getUdfEntriesSym);
Expand All @@ -101,6 +104,9 @@ std::unordered_set<std::shared_ptr<UdfLoader::UdfSignature>> UdfLoader::getRegis
int numUdaf = getNumUdaf();
// allocate
UdafEntry* udafEntries = static_cast<UdafEntry*>(malloc(sizeof(UdafEntry) * numUdaf));
if (udafEntries == nullptr) {
throw gluten::GlutenException("malloc failed");
}

void* getUdafEntriesSym = loadSymFromLibrary(handle, libPath, GLUTEN_TOSTRING(GLUTEN_GET_UDAF_ENTRIES));
auto getUdafEntries = reinterpret_cast<void (*)(UdafEntry*)>(getUdafEntriesSym);
Expand Down

0 comments on commit 411b317

Please sign in to comment.