Skip to content

Commit

Permalink
Nulls in Wave selective reader, expressions (facebookincubator#10179)
Browse files Browse the repository at this point in the history
Summary:
Adds a test with pushdown fliters and nulls.

Adds benchmarks on switch-case, inderect function call and computed goto, different memory access widths and strides etc.

Pull Request resolved: facebookincubator#10179

Reviewed By: Yuhta

Differential Revision: D58528431

Pulled By: oerling

fbshipit-source-id: dbda8dd3ca2f2759d3359791ba2da66adaf978e9
  • Loading branch information
Orri Erling authored and facebook-github-bot committed Jun 13, 2024
1 parent 2da1dd0 commit 9a4e882
Show file tree
Hide file tree
Showing 24 changed files with 1,208 additions and 185 deletions.
87 changes: 48 additions & 39 deletions velox/experimental/wave/common/Bits.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,14 @@ inline __device__ bool isDense(const int32_t* rows, int32_t i, int32_t last) {
return rows[last - 1] - rows[i] == last - i - 1;
}

struct NonNullState {
/// Number of nulls below 'nonNullsBeloRow'.
int32_t nonNullsBelow;
/// First row not included in the count in 'nonNullsBelow'.
int32_t nonNullsBelowRow;
int32_t temp[256 / kWarpThreads];
};

/// Identifies threads that have a non-null value in a 256 thread
/// block. Consecutive threads process consecutive values. If the thread falls
/// on a null, -1 is returned, else the ordinal of the non-null corresponding to
Expand All @@ -324,33 +332,40 @@ inline __device__ int32_t nonNullIndex256(
char* nulls,
int32_t bitOffset,
int32_t numRows,
int32_t* nonNullOffset,
int32_t* temp) {
NonNullState* state) {
if (threadIdx.x == 0) {
state->nonNullsBelow += countBits(
reinterpret_cast<uint64_t*>(nulls), state->nonNullsBelowRow, bitOffset);
state->nonNullsBelowRow = bitOffset;
}
__syncthreads();
int32_t group = threadIdx.x / 32;
uint32_t bits =
threadIdx.x < numRows ? loadBits32(nulls, bitOffset + group * 32, 32) : 0;
if (threadIdx.x == kWarpThreads * group) {
temp[group] = __popc(bits);
state->temp[group] = __popc(bits);
}
auto previousOffset = *nonNullOffset;
auto previousOffset = state->nonNullsBelow;
__syncthreads();
if (threadIdx.x < kWarpThreads) {
using Scan = cub::WarpScan<uint32_t, 8>;
uint32_t count =
threadIdx.x < (blockDim.x / kWarpThreads) ? temp[threadIdx.x] : 0;
uint32_t count = threadIdx.x < (blockDim.x / kWarpThreads)
? state->temp[threadIdx.x]
: 0;
uint32_t start;
Scan(*reinterpret_cast<Scan::TempStorage*>(temp))
Scan(*reinterpret_cast<Scan::TempStorage*>(state->temp))
.ExclusiveSum(count, start);
if (threadIdx.x < blockDim.x / kWarpThreads) {
temp[threadIdx.x] = start;
state->temp[threadIdx.x] = start;
if (threadIdx.x == (blockDim.x / kWarpThreads) - 1 && numRows == 256) {
*nonNullOffset += start + count;
state->nonNullsBelow += start + count;
state->nonNullsBelowRow = bitOffset + numRows;
}
}
}
__syncthreads();
if (bits & (1 << (threadIdx.x & 31))) {
return temp[group] + previousOffset +
return state->temp[group] + previousOffset +
__popc(bits & lowMask<uint32_t>(threadIdx.x & 31));
} else {
return -1;
Expand All @@ -362,67 +377,61 @@ inline __device__ int32_t nonNullIndex256(
/// cover rows[numRows-1] bits. Each thread returns -1 if
/// rows[threadIdx.x] falls on a null and the corresponding index in
/// non-null rows otherwise. This can be called multiple times on
/// consecutive groups of 256 row numbers. the non-null offset of
/// the last row is carried in 'non-nulloffset'. 'rowOffset' is the
/// offset of the first row of the batch of 256 in 'rows', so it has
/// 0, 256, 512.. in consecutive calls.
/// consecutive groups of 256 row numbers. the non-null offset of the
/// last row is carried in 'non-nulloffset'. 'rowOffset' is the offset
/// of the first row of the batch of 256 in 'rows', so it has 0, 256,
/// 512.. in consecutive calls. if 'rowOffset' is negative, -rowOffset
/// gives the row to which '*nonNullOffset' refers, so the first
/// non-'nonNullsBelow' is the non-nulls from -rowOffset to 'rows[0]'.
inline __device__ int32_t nonNullIndex256Sparse(
char* nulls,
int32_t* rows,
int32_t rowOffset,
int32_t numRows,
int32_t* nonNullOffset,
int32_t* extraNonNulls,
int32_t* temp) {
NonNullState* state) {
using Scan32 = cub::WarpScan<uint32_t>;
auto rowIdx = rowOffset + threadIdx.x;
bool isNull = true;
uint32_t nonNullsBelow = 0;
if (rowIdx < numRows) {
isNull = !isBitSet(nulls, rows[rowIdx]);
if (threadIdx.x < numRows) {
isNull = !isBitSet(nulls, rows[threadIdx.x]);
nonNullsBelow = !isNull;
int32_t previousRow = rowIdx == 0 ? 0 : rows[rowIdx - 1];
int32_t previousRow =
threadIdx.x == 0 ? state->nonNullsBelowRow : rows[threadIdx.x - 1] + 1;
nonNullsBelow += countBits(
reinterpret_cast<uint64_t*>(nulls), previousRow + 1, rows[rowIdx]);
reinterpret_cast<uint64_t*>(nulls), previousRow, rows[threadIdx.x]);
}
Scan32(*detail::warpScanTemp(temp))
Scan32(*detail::warpScanTemp(state->temp))
.InclusiveSum(nonNullsBelow, nonNullsBelow);
int32_t previousOffset = state->nonNullsBelow;
if (detail::isLastInWarp()) {
// The last thread of the warp writes warp total.
temp[threadIdx.x / kWarpThreads] = nonNullsBelow;
state->temp[threadIdx.x / kWarpThreads] = nonNullsBelow;
}
int32_t previousOffset = *nonNullOffset;
__syncthreads();
if (threadIdx.x < kWarpThreads) {
int32_t start = 0;
if (threadIdx.x < blockDim.x / kWarpThreads) {
start = temp[threadIdx.x];
start = state->temp[threadIdx.x];
}
using Scan8 = cub::WarpScan<int32_t, 8>;
int32_t sum = 0;
Scan8(*reinterpret_cast<Scan8::TempStorage*>(temp))
Scan8(*reinterpret_cast<Scan8::TempStorage*>(state->temp))
.ExclusiveSum(start, sum);
if (threadIdx.x == (blockDim.x / kWarpThreads) - 1) {
// The last sum thread increments the running count of non-nulls
// by the block total. It adds the optional extraNonNulls to the
// total between the barriers. 'extraNonNulls' or the running
// non-null count do not affect the result of the 256 lanes of
// this.
if (extraNonNulls) {
start += *extraNonNulls;
*extraNonNulls = 0;
}
*nonNullOffset += start + sum;
// by the block total.
state->nonNullsBelow += start + sum;
state->nonNullsBelowRow = rows[numRows - 1] + 1;
}
if (threadIdx.x < blockDim.x / kWarpThreads) {
temp[threadIdx.x] = sum;
state->temp[threadIdx.x] = sum;
}
}
__syncthreads();
if (isNull) {
return -1;
}
return temp[threadIdx.x / kWarpThreads] + previousOffset + nonNullsBelow - 1;
return state->temp[threadIdx.x / kWarpThreads] + previousOffset +
nonNullsBelow - 1;
}

} // namespace facebook::velox::wave
14 changes: 6 additions & 8 deletions velox/experimental/wave/common/Block.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,17 @@ bool256ToIndices(Getter8 getter8, T start, T* indices, T& size, char* smem) {
uint64_t bits = getter8(group) & 0x0101010101010101;
if ((threadIdx.x & 7) == 0) {
smem16[group] = __popcll(bits);
if (threadIdx.x == blockDim.x - 8) {
smem16[32] = smem16[31];
}
}
__syncthreads();
if (threadIdx.x < 32) {
auto* temp = reinterpret_cast<typename Scan::TempStorage*>((smem + 72));
uint16_t data = smem16[threadIdx.x];
Scan(*temp).ExclusiveSum(data, data);
smem16[threadIdx.x] = data;
uint16_t result;
Scan(*temp).ExclusiveSum(data, result);
smem16[threadIdx.x] = result;
if (threadIdx.x == 31) {
size = data + result;
}
}
__syncthreads();
int32_t tidInGroup = threadIdx.x & 7;
Expand All @@ -106,9 +107,6 @@ bool256ToIndices(Getter8 getter8, T start, T* indices, T& size, char* smem) {
smem16[group] + __popcll(bits & lowMask<uint64_t>(tidInGroup * 8));
indices[base] = threadIdx.x + start;
}
if (threadIdx.x == 0) {
size = smem16[31] + smem16[32];
}
__syncthreads();
}

Expand Down
6 changes: 3 additions & 3 deletions velox/experimental/wave/common/Cuda.cu
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ KernelInfo kernelInfo(const void* func) {
int max;
cudaOccupancyMaxActiveBlocksPerMultiprocessor(&max, func, 256, 0);
info.maxOccupancy0 = max;
cudaOccupancyMaxActiveBlocksPerMultiprocessor(&max, func, 256, 16);
info.maxOccupancy16 = max;
cudaOccupancyMaxActiveBlocksPerMultiprocessor(&max, func, 256, 256 * 32);
info.maxOccupancy32 = max;

return info;
}
Expand All @@ -263,7 +263,7 @@ std::string KernelInfo::toString() const {
out << "NumRegs=" << numRegs << " maxThreadsPerBlock= " << maxThreadsPerBlock
<< " sharedMemory=" << sharedMemory
<< " occupancy 256, 0=" << maxOccupancy0
<< " occupancy 256,16=" << maxOccupancy16;
<< " occupancy 256,32=" << maxOccupancy32;
return out.str();
}

Expand Down
2 changes: 1 addition & 1 deletion velox/experimental/wave/common/Cuda.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ struct KernelInfo {
int32_t maxThreadsPerBlock;
int32_t sharedMemory{0};
int32_t maxOccupancy0{0};
int32_t maxOccupancy16{0};
int32_t maxOccupancy32{0};

std::string toString() const;
};
Expand Down
17 changes: 14 additions & 3 deletions velox/experimental/wave/common/CudaUtil.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ __host__ __device__ constexpr inline T roundUp(T value, U factor) {
template <typename T>
constexpr T __device__ __host__ lowMask(int32_t bits) {
/****
* NVCC BUG: If the special case for 32 is not in, all modes except -G
* produce a 0 mask for 32 bits.
* NVCC BUG: If the special case for all bits is not in, all modes except -G
* produce a 0 mask for 32 or 64 bits.
****/
return bits == 32 ? 0xffffffff : (static_cast<T>(1) << bits) - 1;
return bits == 8 * sizeof(T) ? ~static_cast<T>(0)
: (static_cast<T>(1) << bits) - 1;
}

template <typename T>
Expand All @@ -63,6 +64,16 @@ inline const T* __device__ __host__ addBytes(const T* ptr, int bytes) {
return reinterpret_cast<const T*>(reinterpret_cast<const char*>(ptr) + bytes);
}

template <typename T>
inline T* __device__ __host__ addCast(void* ptr, int bytes) {
return reinterpret_cast<T*>(reinterpret_cast<char*>(ptr) + bytes);
}

template <typename T>
inline const T* __device__ __host__ addCast(const void* ptr, int bytes) {
return reinterpret_cast<const T*>(reinterpret_cast<const char*>(ptr) + bytes);
}

__device__ __host__ inline int
memcmp(const void* lhs, const void* rhs, size_t n) {
auto* a = reinterpret_cast<const uint8_t*>(lhs);
Expand Down
2 changes: 1 addition & 1 deletion velox/experimental/wave/common/GpuArena.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
namespace facebook::velox::wave {

uint64_t GpuSlab::roundBytes(uint64_t bytes) {
return bits::nextPowerOfTwo(bytes);
return bits::nextPowerOfTwo(std::max<int64_t>(16, bytes));
}

GpuSlab::GpuSlab(void* ptr, size_t capacityBytes, GpuAllocator* allocator)
Expand Down
33 changes: 16 additions & 17 deletions velox/experimental/wave/common/tests/BlockTest.cu
Original file line number Diff line number Diff line change
Expand Up @@ -611,38 +611,37 @@ void BlockTestStream::scatterBits(
numSource, numTarget, source, targetMask, target, temp);
}

/// Struct for tracking state between calls to nonNullIndex256 and
/// nonNullIndex256Sparse.
struct NonNullIndexState {
// Number of non-nulls below 'row'. the null flag at 'row' is not included in
// the sum.
int32_t numNonNullBelow;
int32_t row;
// Scratch memory with one int32 per warp of 256 wide TB.
int32_t temp[256 / kWarpThreads];
};

void __global__ nonNullIndexKernel(
char* nulls,
int32_t* rows,
int32_t numRows,
int32_t* indices,
int32_t* temp) {
NonNullState* state = reinterpret_cast<NonNullState*>(temp);
if (threadIdx.x == 0) {
temp[0] = countBits(reinterpret_cast<uint64_t*>(nulls), 0, rows[0]);
temp[1] = 0;
state->nonNullsBelow = 0;
state->nonNullsBelowRow = 0;
}
__syncthreads();
for (auto i = 0; i < numRows; i += blockDim.x) {
auto last = min(i + 256, numRows);
if (isDense(rows, i, last)) {
indices[i + threadIdx.x] =
nonNullIndex256(nulls, rows[i], last - i, temp, temp + 2);
nonNullIndex256(nulls, rows[i], last - i, state);
} else {
// If a non-contiguous run is followed by a contiguous run, add the
// non-nulls after between the runs to the total.
if (threadIdx.x == 0) {
int32_t nextLast = min(last + 256, numRows);
// If the next 256 rows are dense, then add the non-nulls between the
// last of the sparse and the first of the dense.
if (isDense(rows, last, nextLast)) {
temp[1] = countBits(
reinterpret_cast<uint64_t*>(nulls),
rows[last - 1] + 1,
rows[last]);
}
}
indices[i + threadIdx.x] =
nonNullIndex256Sparse(nulls, rows, i, last, temp, temp + 1, temp + 2);
nonNullIndex256Sparse(nulls, rows + i, last - i, state);
}
}
__syncthreads();
Expand Down
Loading

0 comments on commit 9a4e882

Please sign in to comment.