Skip to content

Commit

Permalink
Wave hash Table and Group By Update Variations (facebookincubator#9655)
Browse files Browse the repository at this point in the history
Summary:
Adds block utilities for radix sort and partitioning. Adds a template for hash tables for join and group by and a set of alternative ways of updating a group by. Adds a kernel registry for tracking register use, occupancy and shared memory.

Adds a buffer view over WaveBufferPtr.
Adds some more cases and adjustments for roundtripMatrix test.

Pull Request resolved: facebookincubator#9655

Reviewed By: Yuhta

Differential Revision: D56705487

Pulled By: oerling

fbshipit-source-id: 3b34b90def0aa7e70d8c04688ca4b7079f31dc68
  • Loading branch information
Ubuntu authored and facebook-github-bot committed Apr 30, 2024
1 parent 1ec53b5 commit 51ac798
Show file tree
Hide file tree
Showing 31 changed files with 3,398 additions and 150 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/linux-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ jobs:
- name: Make Release Build
env:
MAKEFLAGS: 'NUM_THREADS=8 MAX_HIGH_MEM_JOBS=4 MAX_LINK_JOBS=4'
CUDA_ARCHITECTURES: 60
CUDA_ARCHITECTURES: 70
CUDA_COMPILER: /usr/local/cuda-${CUDA_VERSION}/bin/nvcc
# Without that, nvcc picks /usr/bin/c++ which is GCC 8
CUDA_FLAGS: "-ccbin /opt/rh/gcc-toolset-9/root/usr/bin"
Expand Down
193 changes: 183 additions & 10 deletions velox/experimental/wave/common/Block.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,52 @@

#pragma once

#include <cub/block/block_radix_sort.cuh>
#include <cub/block/block_reduce.cuh>
#include <cub/block/block_scan.cuh>
#include <cub/block/block_store.cuh>
#include "velox/experimental/wave/common/CudaUtil.cuh"

/// Utilities for booleans and indices and thread blocks.

namespace facebook::velox::wave {

template <
typename T,
int32_t blockSize,
cub::BlockScanAlgorithm Algorithm = cub::BLOCK_SCAN_RAKING>
inline int32_t __device__ __host__ boolToIndicesSharedSize() {
typedef cub::BlockScan<T, blockSize, Algorithm> BlockScanT;

return sizeof(typename BlockScanT::TempStorage);
}

/// Converts an array of flags to an array of indices of set flags. The first
/// index is given by 'start'. The number of indices is returned in 'size', i.e.
/// this is 1 + the index of the last set flag.
template <
int32_t blockSize,
typename T,
cub::BlockScanAlgorithm Algorithm = cub::BLOCK_SCAN_RAKING,
typename Getter>
__device__ inline void boolBlockToIndices(
Getter getter,
int32_t start,
int32_t* indices,
void* shmem,
int32_t& size) {
typedef cub::BlockScan<int, blockSize, Algorithm> BlockScanT;
__device__ inline void
boolBlockToIndices(Getter getter, T start, T* indices, void* shmem, T& size) {
typedef cub::BlockScan<T, blockSize, Algorithm> BlockScanT;

auto* temp = reinterpret_cast<typename BlockScanT::TempStorage*>(shmem);
int data[1];
T data[1];
uint8_t flag = getter();
data[0] = flag;
__syncthreads();
int aggregate;
T aggregate;
BlockScanT(*temp).ExclusiveSum(data, data, aggregate);
__syncthreads();
if (flag) {
indices[data[0]] = threadIdx.x + start;
}
if (threadIdx.x == 0) {
size = aggregate;
}
__syncthreads();
}

template <int32_t blockSize, typename T, typename Getter>
Expand All @@ -65,4 +78,164 @@ __device__ inline void blockSum(Getter getter, void* shmem, T* result) {
}
}

template <
int32_t kBlockSize,
int32_t kItemsPerThread,
typename Key,
typename Value>
using RadixSort =
typename cub::BlockRadixSort<Key, kBlockSize, kItemsPerThread, Value>;

template <
int32_t kBlockSize,
int32_t kItemsPerThread,
typename Key,
typename Value>
inline int32_t __host__ __device__ blockSortSharedSize() {
return sizeof(
typename RadixSort<kBlockSize, kItemsPerThread, Key, Value>::TempStorage);
}

template <
int32_t kBlockSize,
int32_t kItemsPerThread,
typename Key,
typename Value,
typename KeyGetter,
typename ValueGetter>
void __device__ blockSort(
KeyGetter keyGetter,
ValueGetter valueGetter,
Key* keyOut,
Value* valueOut,
char* smem) {
using Sort = cub::BlockRadixSort<Key, kBlockSize, kItemsPerThread, Value>;

// Per-thread tile items
Key keys[kItemsPerThread];
Value values[kItemsPerThread];

// Our current block's offset
int blockOffset = 0;

// Load items into a blocked arrangement
for (auto i = 0; i < kItemsPerThread; ++i) {
int32_t idx = blockOffset + i * kBlockSize + threadIdx.x;
values[i] = valueGetter(idx);
keys[i] = keyGetter(idx);
}

__syncthreads();
auto* temp_storage = reinterpret_cast<typename Sort::TempStorage*>(smem);

Sort(*temp_storage).SortBlockedToStriped(keys, values);

// Store output in striped fashion
cub::StoreDirectStriped<kBlockSize>(
threadIdx.x, valueOut + blockOffset, values);
cub::StoreDirectStriped<kBlockSize>(threadIdx.x, keyOut + blockOffset, keys);
__syncthreads();
}

template <int kBlockSize>
int32_t partitionRowsSharedSize(int32_t numPartitions) {
using Scan = cub::BlockScan<int, kBlockSize>;
auto scanSize = sizeof(typename Scan::TempStorage) + sizeof(int32_t);
int32_t counterSize = sizeof(int32_t) * numPartitions;
if (counterSize <= scanSize) {
return scanSize;
}
static_assert(
sizeof(typename Scan::TempStorage) >= sizeof(int32_t) * kBlockSize);
return scanSize + counterSize; // - kBlockSize * sizeof(int32_t);
}

/// Partitions a sequence of indices into runs where the indices
/// belonging to the same partition are contiguous. Indices from 0 to
/// 'numKeys-1' are partitioned into 'partitionedRows', which must
/// have space for 'numKeys' row numbers. The 0-based partition number
/// for row 'i' is given by 'getter(i)'. The row numbers for
/// partition 0 start at 0. The row numbers for partition i start at
/// 'partitionStarts[i-1]'. There must be at least the amount of
/// shared memory given by partitionSharedSize(numPartitions).
/// 'ranks' is a temporary array of 'numKeys' elements.
template <int32_t kBlockSize, typename RowNumber, typename Getter>
void __device__ partitionRows(
Getter getter,
uint32_t numKeys,
uint32_t numPartitions,
RowNumber* ranks,
RowNumber* partitionStarts,
RowNumber* partitionedRows) {
using Scan = cub::BlockScan<int32_t, kBlockSize>;
constexpr int32_t kWarpThreads = 1 << CUB_LOG_WARP_THREADS(0);
auto warp = threadIdx.x / kWarpThreads;
auto lane = cub::LaneId();
extern __shared__ __align__(16) char smem[];
auto* counters = reinterpret_cast<uint32_t*>(
numPartitions <= kBlockSize ? smem
: smem +
sizeof(typename Scan::
TempStorage) /*- kBlockSize * sizeof(uint32_t)*/);
for (auto i = threadIdx.x; i < numPartitions; i += kBlockSize) {
counters[i] = 0;
}
__syncthreads();
for (auto start = 0; start < numKeys; start += kBlockSize) {
int32_t warpStart = start + warp * kWarpThreads;
if (start >= numKeys) {
break;
}
uint32_t laneMask = warpStart + kWarpThreads <= numKeys
? 0xffffffff
: lowMask<uint32_t>(numKeys - warpStart);
if (warpStart + lane < numKeys) {
int32_t key = getter(warpStart + lane);
uint32_t mask = __match_any_sync(laneMask, key);
int32_t leader = (kWarpThreads - 1) - __clz(mask);
uint32_t cnt = __popc(mask & lowMask<uint32_t>(lane + 1));
uint32_t base;
if (lane == leader) {
base = atomicAdd(&counters[key], cnt);
}
base = __shfl_sync(laneMask, base, leader);
ranks[warpStart + lane] = base + cnt - 1;
}
}
// Prefix sum the counts. All counters must have their final value.
__syncthreads();
auto* temp = reinterpret_cast<typename Scan::TempStorage*>(smem);
int32_t* aggregate = reinterpret_cast<int32_t*>(smem);
for (auto start = 0; start < numPartitions; start += kBlockSize) {
int32_t localCount[1];
localCount[0] =
threadIdx.x + start < numPartitions ? counters[start + threadIdx.x] : 0;
if (threadIdx.x == 0 && start > 0) {
// The sum of the previous round is carried over as start of this.
localCount[0] += *aggregate;
}
Scan(*temp).InclusiveSum(localCount, localCount);
if (start + threadIdx.x < numPartitions) {
partitionStarts[start + threadIdx.x] = localCount[0];
}
if (threadIdx.x == kBlockSize - 1 && start + kBlockSize < numPartitions) {
*aggregate = localCount[0];
}
__syncthreads();
}
if (threadIdx.x == 0) {
if (partitionStarts[numPartitions - 1] != numKeys) {
*(long*)0 = 0;
}
}
// Write the row numbers of the inputs into the rankth position in each
// partition.
for (auto i = threadIdx.x; i < numKeys; i += kBlockSize) {
auto key = getter(i);
auto keyStart = key == 0 ? 0 : partitionStarts[key - 1];
partitionedRows[keyStart + ranks[i]] = i;
}
__syncthreads();
}

} // namespace facebook::velox::wave
36 changes: 34 additions & 2 deletions velox/experimental/wave/common/Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class GpuArena;
/// Buffer free list.
class Buffer {
public:
virtual ~Buffer() = default;

template <typename T>
T* as() {
return reinterpret_cast<T*>(ptr_);
Expand Down Expand Up @@ -71,9 +73,9 @@ class Buffer {
return referenceCount_;
}

void release();
virtual void release();

private:
protected:
// Number of WaveBufferPtrs referencing 'this'.
std::atomic<int32_t> referenceCount_{0};

Expand Down Expand Up @@ -108,4 +110,34 @@ static inline void intrusive_ptr_release(Buffer* buffer) {
buffer->release();
}

template <typename Releaser>
class WaveBufferView : public Buffer {
public:
static WaveBufferPtr create(uint8_t* data, size_t size, Releaser releaser) {
WaveBufferView<Releaser>* view = new WaveBufferView(data, size, releaser);
WaveBufferPtr result(view);
return result;
}

~WaveBufferView() override = default;

void release() override {
if (referenceCount_.fetch_sub(1) == 1) {
// Destructs releaser, which should release the hold on the underlying
// buffer.
delete this;
}
}

private:
WaveBufferView(uint8_t* data, size_t size, Releaser releaser)
: Buffer(), releaser_(releaser) {
ptr_ = data;
size_ = size;
capacity_ = size;
}

Releaser const releaser_;
};

} // namespace facebook::velox::wave
75 changes: 75 additions & 0 deletions velox/experimental/wave/common/Cuda.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

#include <cuda_runtime.h>
#include <fmt/format.h>
#include <iostream>
#include "velox/experimental/wave/common/Cuda.h"
#include "velox/experimental/wave/common/CudaUtil.cuh"
#include "velox/experimental/wave/common/Exception.h"

#include <sstream>

namespace facebook::velox::wave {

void cudaCheck(cudaError_t err, const char* file, int line) {
Expand All @@ -30,6 +33,16 @@ void cudaCheck(cudaError_t err, const char* file, int line) {
fmt::format("Cuda error: {}:{} {}", file, line, cudaGetErrorString(err)));
}

void cudaCheckFatal(cudaError_t err, const char* file, int line) {
if (err == cudaSuccess) {
return;
}
auto error =
fmt::format("Cuda error: {}:{} {}", file, line, cudaGetErrorString(err));
std::cerr << err << std::endl;
exit(1);
}

namespace {
class CudaManagedAllocator : public GpuAllocator {
public:
Expand Down Expand Up @@ -208,5 +221,67 @@ float Event::elapsedTime(const Event& start) const {
CUDA_CHECK(cudaEventElapsedTime(&ms, start.event_->event, event_->event));
return ms;
}
namespace {
struct KernelEntry {
const char* name;
const void* func;
};

int32_t numKernelEntries = 0;
KernelEntry kernelEntries[200];
} // namespace

bool registerKernel(const char* name, const void* func) {
kernelEntries[numKernelEntries].name = name;
kernelEntries[numKernelEntries].func = func;
++numKernelEntries;
if (numKernelEntries >= sizeof(kernelEntries) / sizeof(kernelEntries[0])) {
LOG(ERROR) << "Reserve more space in kernelEntries";
exit(1);
}
return true;
}

KernelInfo kernelInfo(const const void* func) {
cudaFuncAttributes attrs;
CUDA_CHECK_FATAL(cudaFuncGetAttributes(&attrs, func));
KernelInfo info;
info.numRegs = attrs.numRegs;
info.maxThreadsPerBlock = attrs.maxThreadsPerBlock;
info.sharedMemory = attrs.sharedSizeBytes;
int max;
cudaOccupancyMaxActiveBlocksPerMultiprocessor(&max, func, 256, 0);
info.maxOccupancy0 = max;
cudaOccupancyMaxActiveBlocksPerMultiprocessor(&max, func, 256, 16);
info.maxOccupancy16 = max;

return info;
}

std::string KernelInfo::toString() const {
std::stringstream out;
out << "NumRegs=" << numRegs << " maxThreadsPerBlock= " << maxThreadsPerBlock
<< " sharedMemory=" << sharedMemory
<< " occupancy 256, 0=" << maxOccupancy0
<< " occupancy 256,16=" << maxOccupancy16;
return out.str();
}

KernelInfo getRegisteredKernelInfo(const char* name) {
for (auto i = 0; i < numKernelEntries; ++i) {
if (strcmp(name, kernelEntries[i].name) == 0) {
return kernelInfo(kernelEntries[i].func);
}
}
return KernelInfo();
}

void printKernels() {
for (auto i = 0; i < numKernelEntries; ++i) {
std::cout << kernelEntries[i].name << " - "
<< getRegisteredKernelInfo(kernelEntries[i].name).toString()
<< std::endl;
}
}

} // namespace facebook::velox::wave
Loading

0 comments on commit 51ac798

Please sign in to comment.