Skip to content

Commit

Permalink
Merge branch 'facebookincubator:main' into add_join_fuzzer_experimental
Browse files Browse the repository at this point in the history
  • Loading branch information
kgpai authored Jan 10, 2024
2 parents f7d1edc + f15e096 commit 6eda736
Show file tree
Hide file tree
Showing 28 changed files with 421 additions and 156 deletions.
3 changes: 3 additions & 0 deletions CMake/resolve_dependency_modules/cpr.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ FetchContent_Declare(
${CMAKE_CURRENT_LIST_DIR}/cpr/cpr-libcurl-compatible.patch)
set(BUILD_SHARED_LIBS OFF)
set(CPR_USE_SYSTEM_CURL OFF)
# ZLIB has already been found by find_package(ZLIB, REQUIRED), set CURL_ZLIB=OFF
# to save compile time.
set(CURL_ZLIB OFF)
FetchContent_MakeAvailable(cpr)
# libcpr in its CMakeLists.txt file disables the BUILD_TESTING globally when
# CPR_USE_SYSTEM_CURL=OFF. unset BUILD_TESTING here.
Expand Down
1 change: 0 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ if(${VELOX_BUILD_TESTING})
endif()

if(${VELOX_ENABLE_BENCHMARKS})
set(VELOX_BUILD_TEST_UTILS ON)
set(VELOX_ENABLE_BENCHMARKS_BASIC ON)
endif()

Expand Down
2 changes: 1 addition & 1 deletion velox/common/caching/tests/AsyncDataCacheTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ TEST_F(AsyncDataCacheTest, cacheStats) {
"Prefetch entries: 0 bytes: 0B\n"
"Alloc Megaclocks 0\n"
"Allocated pages: 0 cached pages: 0\n"
"Backing: Memory Allocator[MMAP capacity 16.00KB allocated pages 0 mapped pages 0 external mapped pages 0\n"
"Backing: Memory Allocator[MMAP total capacity 64.00MB free capacity 64.00MB allocated pages 0 mapped pages 0 external mapped pages 0\n"
"[size 1: 0(0MB) allocated 0 mapped]\n"
"[size 2: 0(0MB) allocated 0 mapped]\n"
"[size 4: 0(0MB) allocated 0 mapped]\n"
Expand Down
10 changes: 9 additions & 1 deletion velox/common/memory/Allocation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,21 @@ Allocation::~Allocation() {
}
}

void Allocation::append(uint8_t* address, int32_t numPages) {
void Allocation::append(uint8_t* address, MachinePageCount numPages) {
numPages_ += numPages;
VELOX_CHECK(
runs_.empty() || address != runs_.back().data(),
"Appending a duplicate address into a PageRun");
// Split an allocation into multiple PageRuns if the number of pages are
// greater than 'PageRun::kMaxPagesInRun'.
while (numPages > PageRun::kMaxPagesInRun) {
runs_.emplace_back(address, PageRun::kMaxPagesInRun);
address += AllocationTraits::pageBytes(PageRun::kMaxPagesInRun);
numPages -= PageRun::kMaxPagesInRun;
}
runs_.emplace_back(address, numPages);
}

void Allocation::appendMove(Allocation& other) {
for (auto& run : other.runs_) {
numPages_ += run.numPages();
Expand Down
5 changes: 3 additions & 2 deletions velox/common/memory/Allocation.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class Allocation {
VELOX_CHECK(numPages_ != 0 || pool_ == nullptr);
}

void append(uint8_t* address, int32_t numPages);
void append(uint8_t* address, MachinePageCount numPages);

void clear() {
runs_.clear();
Expand All @@ -193,6 +193,7 @@ class Allocation {
VELOX_FRIEND_TEST(MemoryAllocatorTest, allocationClass2);
VELOX_FRIEND_TEST(AllocationTest, append);
VELOX_FRIEND_TEST(AllocationTest, appendMove);
VELOX_FRIEND_TEST(AllocationTest, multiplePageRuns);
};

/// Represents a run of contiguous pages that do not belong to any size class.
Expand Down Expand Up @@ -263,7 +264,7 @@ class ContiguousAllocation {

// Adjusts 'size' towards 'maxSize' by 'increment' pages. Rounds
// 'increment' to huge pages, since this is the unit of growth of
// RSS for large contiguous runs. Increases the reservation in in
// RSS for large contiguous runs. Increases the reservation in
// 'pool_' and its allocator. May fail by cap exceeded. If failing,
// the size is not changed. 'size_' cannot exceed 'maxSize_'.
void grow(MachinePageCount increment);
Expand Down
40 changes: 24 additions & 16 deletions velox/common/memory/MallocAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ bool MallocAllocator::allocateNonContiguousWithoutRetry(
}
}

std::vector<void*> pages;
std::unordered_map<void*, MachinePageCount> pages;
pages.reserve(mix.numSizes);
for (int32_t i = 0; i < mix.numSizes; ++i) {
MachinePageCount numSizeClassPages =
Expand All @@ -99,14 +99,14 @@ bool MallocAllocator::allocateNonContiguousWithoutRetry(
setAllocatorFailureMessage(errorMsg);
break;
}
pages.emplace_back(ptr);
pages.emplace(ptr, numSizeClassPages);
out.append(reinterpret_cast<uint8_t*>(ptr), numSizeClassPages); // NOLINT
}

if (pages.size() != mix.numSizes) {
// Failed to allocate memory using malloc. Free any malloced pages and
// return false.
for (auto ptr : pages) {
for (const auto& [ptr, numMachinePages] : pages) {
::free(ptr);
}
out.clear();
Expand All @@ -123,8 +123,8 @@ bool MallocAllocator::allocateNonContiguousWithoutRetry(
}

{
std::lock_guard<std::mutex> l(mallocsMutex_);
mallocs_.insert(pages.begin(), pages.end());
std::lock_guard<std::mutex> l(nonContiguousMallocsMutex_);
nonContiguousMallocs_.insert(pages.begin(), pages.end());
}

// Successfully allocated all pages.
Expand Down Expand Up @@ -242,20 +242,28 @@ int64_t MallocAllocator::freeNonContiguous(Allocation& allocation) {
MachinePageCount numFreed = 0;
for (int32_t i = 0; i < allocation.numRuns(); ++i) {
Allocation::PageRun run = allocation.runAt(i);
numFreed += run.numPages();
void* ptr = run.data();
int64_t numPages;
{
std::lock_guard<std::mutex> l(mallocsMutex_);
const auto ret = mallocs_.erase(ptr);
VELOX_CHECK_EQ(ret, 1, "Bad free page pointer: {}", ptr);
std::lock_guard<std::mutex> l(nonContiguousMallocsMutex_);
const auto it = nonContiguousMallocs_.find(ptr);
VELOX_CHECK(
it != nonContiguousMallocs_.end(), "Bad free page pointer: {}", ptr);
numPages = static_cast<int64_t>(it->second);
nonContiguousMallocs_.erase(it);
}
stats_.recordFree(
std::min<int64_t>(
AllocationTraits::pageBytes(sizeClassSizes_.back()),
AllocationTraits::pageBytes(run.numPages())),
[&]() {
::free(ptr); // NOLINT
});
numFreed += numPages;
stats_.recordFree(AllocationTraits::pageBytes(numPages), [&]() {
::free(ptr); // NOLINT
});
numPages -= static_cast<int64_t>(run.numPages());
// It is possible that an allocation spans multiple PageRuns.
while (numPages > 0) {
++i;
VELOX_CHECK_LT(i, allocation.numRuns());
numPages -= allocation.runAt(i).numPages();
}
VELOX_CHECK_EQ(numPages, 0);
}

const auto freedBytes = AllocationTraits::pageBytes(numFreed);
Expand Down
14 changes: 9 additions & 5 deletions velox/common/memory/MallocAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,15 @@ class MallocAllocator : public MemoryAllocator {
/// Current total allocated bytes by this 'MallocAllocator'.
std::atomic<int64_t> allocatedBytes_{0};

/// Mutex for 'mallocs_'.
std::mutex mallocsMutex_;

/// Tracks malloc'd pointers to detect bad frees.
std::unordered_set<void*> mallocs_;
/// Mutex for 'nonContiguousMallocs_'.
std::mutex nonContiguousMallocsMutex_;

/// Tracks malloc'd pointers and the corresponding MachinePageCount in
/// non-contiguous allocations to detect bad frees.
/// Since an allocation can span across multiple PageRuns, we need to store
/// the MachinePageCount for each allocation to identify allocation boundaries
/// across the PageRuns.
std::unordered_map<void*, MachinePageCount> nonContiguousMallocs_;

std::shared_ptr<Cache> cache_;
};
Expand Down
8 changes: 0 additions & 8 deletions velox/common/memory/MemoryAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,6 @@ MemoryAllocator::SizeMix MemoryAllocator::allocationSize(
++numUnits;
needed -= size;
}
if (FOLLY_UNLIKELY(numUnits * size > Allocation::PageRun::kMaxPagesInRun)) {
VELOX_MEM_ALLOC_ERROR(fmt::format(
"Too many pages {} to allocate, the number of units {} at size class of {} exceeds the PageRun limit {}",
numPages,
numUnits,
size,
Allocation::PageRun::kMaxPagesInRun));
}
mix.sizeCounts[mix.numSizes] = numUnits;
pagesToAlloc += numUnits * size;
mix.sizeIndices[mix.numSizes++] = sizeIndex;
Expand Down
9 changes: 7 additions & 2 deletions velox/common/memory/MmapAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1028,8 +1028,13 @@ bool MmapAllocator::useMalloc(uint64_t bytes) {

std::string MmapAllocator::toString() const {
std::stringstream out;
out << "Memory Allocator[" << kindString(kind_) << " capacity "
<< ((capacity_ == kMaxMemory) ? "UNLIMITED" : succinctBytes(capacity_))
out << "Memory Allocator[" << kindString(kind_) << " total capacity "
<< ((capacity_ == kMaxMemory) ? "UNLIMITED" : succinctBytes(capacity()))
<< " free capacity "
<< ((capacity_ == kMaxMemory)
? "UNLIMITED"
: succinctBytes(
capacity() - AllocationTraits::pageBytes(numAllocated())))
<< " allocated pages " << numAllocated_ << " mapped pages " << numMapped_
<< " external mapped pages " << numExternalMapped_ << std::endl;
for (auto& sizeClass : sizeClasses_) {
Expand Down
22 changes: 22 additions & 0 deletions velox/common/memory/tests/AllocationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,26 @@ TEST_F(AllocationTest, appendMove) {
allocation.clear();
}

TEST_F(AllocationTest, multiplePageRuns) {
Allocation allocation;
const uint64_t startBufAddrValue = 4096;
uint8_t* const firstBufAddr = reinterpret_cast<uint8_t*>(startBufAddrValue);
allocation.append(firstBufAddr, Allocation::PageRun::kMaxPagesInRun + 100);
ASSERT_EQ(allocation.numPages(), Allocation::PageRun::kMaxPagesInRun + 100);
ASSERT_EQ(allocation.numRuns(), 2);

uint8_t* const secondBufAddr = reinterpret_cast<uint8_t*>(
startBufAddrValue + AllocationTraits::pageBytes(allocation.numPages()));
allocation.append(secondBufAddr, Allocation::PageRun::kMaxPagesInRun - 100);
ASSERT_EQ(allocation.numPages(), Allocation::PageRun::kMaxPagesInRun * 2);
ASSERT_EQ(allocation.numRuns(), 3);

uint8_t* const thirdBufAddr = reinterpret_cast<uint8_t*>(
firstBufAddr + 2 * AllocationTraits::pageBytes(allocation.numPages()));
allocation.append(thirdBufAddr, Allocation::PageRun::kMaxPagesInRun * 2);
ASSERT_EQ(allocation.numPages(), Allocation::PageRun::kMaxPagesInRun * 4);
ASSERT_EQ(allocation.numRuns(), 5);
allocation.clear();
}

} // namespace facebook::velox::memory
19 changes: 9 additions & 10 deletions velox/common/memory/tests/MemoryAllocatorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct ProcessSize {
};
} // namespace

static constexpr uint64_t kCapacityBytes = 256UL * 1024 * 1024;
static constexpr uint64_t kCapacityBytes = 1024UL * 1024 * 1024;
static constexpr MachinePageCount kCapacityPages =
(kCapacityBytes / AllocationTraits::kPageSize);

Expand Down Expand Up @@ -85,12 +85,12 @@ class MemoryAllocatorTest : public testing::TestWithParam<bool> {
ASSERT_EQ(instance_->kind(), MemoryAllocator::Kind::kMmap);
ASSERT_EQ(
instance_->toString(),
"Memory Allocator[MMAP capacity 64.00KB allocated pages 0 mapped pages 0 external mapped pages 0\n[size 1: 0(0MB) allocated 0 mapped]\n[size 2: 0(0MB) allocated 0 mapped]\n[size 4: 0(0MB) allocated 0 mapped]\n[size 8: 0(0MB) allocated 0 mapped]\n[size 16: 0(0MB) allocated 0 mapped]\n[size 32: 0(0MB) allocated 0 mapped]\n[size 64: 0(0MB) allocated 0 mapped]\n[size 128: 0(0MB) allocated 0 mapped]\n[size 256: 0(0MB) allocated 0 mapped]\n]");
"Memory Allocator[MMAP total capacity 1.00GB free capacity 1.00GB allocated pages 0 mapped pages 0 external mapped pages 0\n[size 1: 0(0MB) allocated 0 mapped]\n[size 2: 0(0MB) allocated 0 mapped]\n[size 4: 0(0MB) allocated 0 mapped]\n[size 8: 0(0MB) allocated 0 mapped]\n[size 16: 0(0MB) allocated 0 mapped]\n[size 32: 0(0MB) allocated 0 mapped]\n[size 64: 0(0MB) allocated 0 mapped]\n[size 128: 0(0MB) allocated 0 mapped]\n[size 256: 0(0MB) allocated 0 mapped]\n]");
} else {
ASSERT_EQ(instance_->kind(), MemoryAllocator::Kind::kMalloc);
ASSERT_EQ(
instance_->toString(),
"Memory Allocator[MALLOC capacity 256.00MB allocated bytes 0 allocated pages 0 mapped pages 0]");
"Memory Allocator[MALLOC capacity 1.00GB allocated bytes 0 allocated pages 0 mapped pages 0]");
}
ASSERT_EQ(
MemoryAllocator::kindString(static_cast<MemoryAllocator::Kind>(100)),
Expand Down Expand Up @@ -1342,16 +1342,18 @@ TEST_P(MemoryAllocatorTest, StlMemoryAllocator) {
}
}

TEST_P(MemoryAllocatorTest, badNonContiguousAllocation) {
TEST_P(MemoryAllocatorTest, nonContiguousAllocationBounds) {
// Set the num of pages to allocate exceeds one PageRun limit.
constexpr MachinePageCount kNumPages =
Allocation::PageRun::kMaxPagesInRun + 1;
std::unique_ptr<Allocation> allocation(new Allocation());
ASSERT_THROW(
instance_->allocateNonContiguous(kNumPages, *allocation),
VeloxRuntimeError);
ASSERT_TRUE(instance_->allocateNonContiguous(kNumPages, *allocation));
instance_->freeNonContiguous(*allocation);
ASSERT_TRUE(instance_->allocateNonContiguous(kNumPages - 1, *allocation));
instance_->freeNonContiguous(*allocation);
ASSERT_TRUE(instance_->allocateNonContiguous(
Allocation::PageRun::kMaxPagesInRun * 2, *allocation));
instance_->freeNonContiguous(*allocation);
}

TEST_P(MemoryAllocatorTest, contiguousAllocation) {
Expand Down Expand Up @@ -1442,9 +1444,6 @@ TEST_P(MemoryAllocatorTest, allocatorCapacity) {
TEST_P(MemoryAllocatorTest, allocatorCapacityWithThreads) {
std::atomic<int64_t> numOps{0};
const int64_t numMaxOps = 100000;
// We need large enough (at least close to capacity) allocations to breach the
// capacity limit in this test.
EXPECT_GT(Allocation::PageRun::kMaxPagesInRun, kCapacityPages / 4 * 3);
const int64_t nonContAllocPages = Allocation::PageRun::kMaxPagesInRun;

std::function<void()> nonContiguousReserveFail = [&, this]() {
Expand Down
13 changes: 9 additions & 4 deletions velox/common/memory/tests/MemoryPoolTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1784,21 +1784,26 @@ TEST_P(MemoryPoolTest, contiguousAllocateGrowExceedMemoryPoolLimit) {
ASSERT_EQ(allocation.numPages(), kMaxNumPages / 2);
}

TEST_P(MemoryPoolTest, badNonContiguousAllocation) {
TEST_P(MemoryPoolTest, nonContiguousAllocationBounds) {
auto manager = getMemoryManager();
auto pool = manager->addLeafPool("badNonContiguousAllocation");
auto pool = manager->addLeafPool("nonContiguousAllocationBounds");
Allocation allocation;
// Bad zero page allocation size.
ASSERT_THROW(pool->allocateNonContiguous(0, allocation), VeloxRuntimeError);

// Set the num of pages to allocate exceeds one PageRun limit.
constexpr MachinePageCount kNumPages =
Allocation::PageRun::kMaxPagesInRun + 1;
ASSERT_THROW(
pool->allocateNonContiguous(kNumPages, allocation), VeloxRuntimeError);
pool->allocateNonContiguous(kNumPages, allocation);
ASSERT_GE(allocation.numPages(), kNumPages);
pool->freeNonContiguous(allocation);
pool->allocateNonContiguous(kNumPages - 1, allocation);
ASSERT_GE(allocation.numPages(), kNumPages - 1);
pool->freeNonContiguous(allocation);
pool->allocateNonContiguous(
Allocation::PageRun::kMaxPagesInRun * 2, allocation);
ASSERT_GE(allocation.numPages(), Allocation::PageRun::kMaxPagesInRun * 2);
pool->freeNonContiguous(allocation);
}

TEST_P(MemoryPoolTest, nonContiguousAllocateExceedLimit) {
Expand Down
36 changes: 19 additions & 17 deletions velox/connectors/hive/HivePartitionUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,23 @@ template <TypeKind Kind>
std::pair<std::string, std::string> makePartitionKeyValueString(
const BaseVector* partitionVector,
vector_size_t row,
const std::string& name) {
const std::string& name,
bool isDate) {
using T = typename TypeTraits<Kind>::NativeType;
if (partitionVector->as<SimpleVector<T>>()->isNullAt(row)) {
return std::make_pair(name, "");
}
if (isDate) {
return std::make_pair(
name,
DATE()->toString(
partitionVector->as<SimpleVector<int32_t>>()->valueAt(row)));
}
return std::make_pair(
name,
makePartitionValueString(
partitionVector->as<SimpleVector<T>>()->valueAt(row)));
};
}

} // namespace

Expand All @@ -66,21 +76,13 @@ std::vector<std::pair<std::string, std::string>> extractPartitionKeyValues(
vector_size_t row) {
std::vector<std::pair<std::string, std::string>> partitionKeyValues;
for (auto i = 0; i < partitionsVector->childrenSize(); i++) {
if (partitionsVector->childAt(i)->type()->isDate()) {
auto partitionVector = partitionsVector->childAt(i)->loadedVector();
auto partitionName = asRowType(partitionsVector->type())->nameOf(i);
partitionKeyValues.push_back(
{partitionName,
DATE()->toString(
partitionVector->as<SimpleVector<int32_t>>()->valueAt(row))});
} else {
partitionKeyValues.push_back(PARTITION_TYPE_DISPATCH(
makePartitionKeyValueString,
partitionsVector->childAt(i)->typeKind(),
partitionsVector->childAt(i)->loadedVector(),
row,
asRowType(partitionsVector->type())->nameOf(i)));
}
partitionKeyValues.push_back(PARTITION_TYPE_DISPATCH(
makePartitionKeyValueString,
partitionsVector->childAt(i)->typeKind(),
partitionsVector->childAt(i)->loadedVector(),
row,
asRowType(partitionsVector->type())->nameOf(i),
partitionsVector->childAt(i)->type()->isDate()));
}
return partitionKeyValues;
}
Expand Down
Loading

0 comments on commit 6eda736

Please sign in to comment.