Skip to content

Commit

Permalink
opt(export_batch_if): Optimize the export_batch_if in cond to reduce …
Browse files Browse the repository at this point in the history
…memory wavefronts
  • Loading branch information
Lifann authored and oppenheimli committed Aug 21, 2024
1 parent 4f38be5 commit 9b79bc9
Show file tree
Hide file tree
Showing 5 changed files with 453 additions and 19 deletions.
6 changes: 5 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,8 @@ TARGET_LINK_LIBRARIES(find_with_missed_keys_test gtest_main)
add_executable(reserved_keys_test tests/reserved_keys_test.cc.cu)
target_compile_features(reserved_keys_test PUBLIC cxx_std_14)
set_target_properties(reserved_keys_test PROPERTIES CUDA_ARCHITECTURES OFF)
TARGET_LINK_LIBRARIES(reserved_keys_test gtest_main)
TARGET_LINK_LIBRARIES(reserved_keys_test gtest_main)

add_executable(export_batch_if_test tests/export_batch_if_test.cc.cu)
target_compile_features(export_batch_if_test PUBLIC cxx_std_14)
set_target_properties(export_batch_if_test PROPERTIES CUDA_ARCHITECTURES OFF)
162 changes: 162 additions & 0 deletions include/merlin/core_kernels.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -910,5 +910,167 @@ __global__ void dump_kernel(const Table<K, V, S>* __restrict table,
}
}

template <class K, class V, class S,
template <typename, typename> class PredFunctor, int TILE_SIZE>
__global__ void dump_kernel_v2(const Table<K, V, S>* __restrict table,
Bucket<K, V, S>* buckets, const K pattern,
const S threshold, K* d_key, V* __restrict d_val,
S* __restrict d_score, const size_t offset,
const size_t search_length,
size_t* d_dump_counter) {
const size_t bucket_max_size = table->bucket_max_size;
int dim = table->dim;
auto g = cg::tiled_partition<TILE_SIZE>(cg::this_thread_block());

PredFunctor<K, S> pred;
size_t tid = static_cast<size_t>(blockIdx.x * blockDim.x + threadIdx.x);

for (size_t ii = tid; ii < search_length; ii += gridDim.x * blockDim.x) {
size_t bkt_idx = (ii + offset) / bucket_max_size;
size_t key_idx = (ii + offset) % bucket_max_size;
size_t leading_key_idx = key_idx / TILE_SIZE * TILE_SIZE;
Bucket<K, V, S>* bucket = &(buckets[bkt_idx]);

const K key =
(bucket->keys(key_idx))->load(cuda::std::memory_order_relaxed);
S score = bucket->scores(key_idx)->load(cuda::std::memory_order_relaxed);

bool match =
(!IS_RESERVED_KEY<K>(key)) && pred(key, score, pattern, threshold);
unsigned int vote = g.ballot(match);
int tile_cnt = __popc(vote);
size_t tile_offset = 0;
if (g.thread_rank() == 0) {
tile_offset = atomicAdd(d_dump_counter, static_cast<size_t>(tile_cnt));
}
tile_offset = g.shfl(tile_offset, 0);
int bias_g = tile_cnt - __popc(vote >> (key_idx % TILE_SIZE));

if (match) {
d_key[tile_offset + bias_g] = key;
if (d_score) {
d_score[tile_offset + bias_g] = score;
}
}

#pragma unroll
for (int r = 0; r < TILE_SIZE; r++) {
unsigned int biased_vote = vote >> r;
bool cur_match = biased_vote & 1;
if (cur_match) {
int bias = tile_cnt - __popc(biased_vote);
size_t cur_idx = leading_key_idx + r;

for (int j = g.thread_rank(); j < dim; j += TILE_SIZE) {
d_val[(tile_offset + bias) * dim + j] =
bucket->vectors[cur_idx * dim + j];
}
}
}
}
}

template <class K, class V, class S,
template <typename, typename> class PredFunctor, int TILE_SIZE>
__global__ void dump_kernel_v2_vectorized(const Table<K, V, S>* __restrict table,
Bucket<K, V, S>* buckets, const K pattern,
const S threshold, K* d_key, V* __restrict d_val,
S* __restrict d_score, const size_t offset,
const size_t search_length,
size_t* d_dump_counter) {
const size_t bucket_max_size = table->bucket_max_size;
int dim = table->dim;
auto g = cg::tiled_partition<TILE_SIZE>(cg::this_thread_block());

PredFunctor<K, S> pred;
size_t tid = static_cast<size_t>(blockIdx.x * blockDim.x + threadIdx.x);

for (size_t ii = tid; ii < search_length; ii += gridDim.x * blockDim.x) {
size_t bkt_idx = (ii + offset) / bucket_max_size;
size_t key_idx = (ii + offset) % bucket_max_size;
size_t leading_key_idx = key_idx / TILE_SIZE * TILE_SIZE;
Bucket<K, V, S>* bucket = &(buckets[bkt_idx]);

const K key =
(bucket->keys(key_idx))->load(cuda::std::memory_order_relaxed);
S score = bucket->scores(key_idx)->load(cuda::std::memory_order_relaxed);

bool match =
(!IS_RESERVED_KEY<K>(key)) && pred(key, score, pattern, threshold);
unsigned int vote = g.ballot(match);
int tile_cnt = __popc(vote);
size_t tile_offset = 0;
if (g.thread_rank() == 0) {
tile_offset = atomicAdd(d_dump_counter, static_cast<size_t>(tile_cnt));
}
tile_offset = g.shfl(tile_offset, 0);
int bias_g = tile_cnt - __popc(vote >> (key_idx % TILE_SIZE));

if (match) {
d_key[tile_offset + bias_g] = key;
if (d_score) {
d_score[tile_offset + bias_g] = score;
}
}

#pragma unroll
for (int r = 0; r < TILE_SIZE; r++) {
unsigned int biased_vote = vote >> r;
bool cur_match = biased_vote & 1;
if (cur_match) {
int bias = tile_cnt - __popc(biased_vote);
size_t cur_idx = leading_key_idx + r;

float4* d_val_fp4 = reinterpret_cast<float4*>(d_val);
float4* vec_fp4 = reinterpret_cast<float4*>(bucket->vectors);
int d4 = dim / 4;
for (int j = g.thread_rank(); j < d4; j += TILE_SIZE) {
d_val_fp4[(tile_offset + bias) * d4 + j] = vec_fp4[cur_idx * d4 + j];
}
}
}
}
}

template <class K, class V, class S,
template <typename, typename> class PredFunctor>
__global__ void size_if_kernel(const Table<K, V, S>* __restrict table,
Bucket<K, V, S>* buckets, const K pattern,
const S threshold, size_t* d_counter) {
extern __shared__ unsigned char s[];
KVM<K, V, S>* const block_tuples{reinterpret_cast<KVM<K, V, S>*>(s)};

const size_t bucket_max_size{table->bucket_max_size};

size_t local_acc = 0;
__shared__ size_t block_acc;
PredFunctor<K, S> pred;

const size_t tid{blockIdx.x * blockDim.x + threadIdx.x};

if (threadIdx.x == 0) {
block_acc = 0;
}
__syncthreads();

for (size_t i = tid; i < table->capacity; i += blockDim.x * gridDim.x) {
Bucket<K, V, S>* const bucket{&buckets[i / bucket_max_size]};

const int key_idx{static_cast<int>(i % bucket_max_size)};
const K key{(bucket->keys(key_idx))->load(cuda::std::memory_order_relaxed)};
S score = bucket->scores(key_idx)->load(cuda::std::memory_order_relaxed);

if ((!IS_RESERVED_KEY(key)) && pred(key, score, pattern, threshold)) {
++local_acc;
}
}
atomicAdd(&block_acc, local_acc);
__syncthreads();

if (threadIdx.x == 0) {
atomicAdd(d_counter, block_acc);
}
}

} // namespace merlin
} // namespace nv
108 changes: 94 additions & 14 deletions include/merlin_hashtable.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,8 @@ class HashTable : public HashTableBase<K, V, S> {
cudaDeviceProp deviceProp;
CUDA_CHECK(cudaGetDeviceProperties(&deviceProp, options_.device_id));
shared_mem_size_ = deviceProp.sharedMemPerBlock;
sm_cnt_ = deviceProp.multiProcessorCount;
max_threads_per_block_ = deviceProp.maxThreadsPerBlock;
create_table<key_type, value_type, score_type>(
&table_, allocator_, options_.dim, options_.init_capacity,
options_.max_capacity, options_.max_hbm_for_vectors,
Expand Down Expand Up @@ -2611,22 +2613,76 @@ class HashTable : public HashTableBase<K, V, S> {
return;
}
n = std::min(table_->capacity - offset, n);
if (n == 0) {
return;
}

const size_t score_size = scores ? sizeof(score_type) : 0;
const size_t kvm_size =
sizeof(key_type) + sizeof(value_type) * dim() + score_size;
const size_t block_size = std::min(shared_mem_size_ / 2 / kvm_size, 1024UL);
MERLIN_CHECK(
block_size > 0,
"[HierarchicalKV] block_size <= 0, the K-V-S size may be too large!");

const size_t shared_size = kvm_size * block_size;
const size_t grid_size = SAFE_GET_GRID_SIZE(n, block_size);
bool match_fast_cond = options_.max_bucket_size % TILE_SIZE == 0 &&
options_.max_bucket_size >= TILE_SIZE &&
offset % TILE_SIZE == 0 && n % TILE_SIZE == 0;

if (match_fast_cond) {
int grid_size = std::min(
sm_cnt_ * max_threads_per_block_ / options_.block_size,
static_cast<int>(SAFE_GET_GRID_SIZE(n, options_.block_size)));
if (sizeof(V) == sizeof(float) && dim() >= 32 && dim() % 4 == 0) {
if (dim() >= 128) {
const int TILE_SIZE = 32;
dump_kernel_v2_vectorized<key_type, value_type, score_type, PredFunctor, TILE_SIZE>
<<<grid_size, options_.block_size, 0, stream>>>(
d_table_, table_->buckets, pattern, threshold, keys, values,
scores, offset, n, d_counter);
} else if (dim() >= 64) {
const int TILE_SIZE = 16;
dump_kernel_v2_vectorized<key_type, value_type, score_type, PredFunctor, TILE_SIZE>
<<<grid_size, options_.block_size, 0, stream>>>(
d_table_, table_->buckets, pattern, threshold, keys, values,
scores, offset, n, d_counter);
} else {
const int TILE_SIZE = 8;
dump_kernel_v2_vectorized<key_type, value_type, score_type, PredFunctor, TILE_SIZE>
<<<grid_size, options_.block_size, 0, stream>>>(
d_table_, table_->buckets, pattern, threshold, keys, values,
scores, offset, n, d_counter);
}
} else {
if (dim() >= 32) {
const int TILE_SIZE = 32;
dump_kernel_v2<key_type, value_type, score_type, PredFunctor, TILE_SIZE>
<<<grid_size, options_.block_size, 0, stream>>>(
d_table_, table_->buckets, pattern, threshold, keys, values,
scores, offset, n, d_counter);
} else if (dim() >= 16) {
const int TILE_SIZE = 16;
dump_kernel_v2<key_type, value_type, score_type, PredFunctor, TILE_SIZE>
<<<grid_size, options_.block_size, 0, stream>>>(
d_table_, table_->buckets, pattern, threshold, keys, values,
scores, offset, n, d_counter);
} else {
const int TILE_SIZE = 8;
dump_kernel_v2<key_type, value_type, score_type, PredFunctor, TILE_SIZE>
<<<grid_size, options_.block_size, 0, stream>>>(
d_table_, table_->buckets, pattern, threshold, keys, values,
scores, offset, n, d_counter);
}
}
} else {
const size_t score_size = scores ? sizeof(score_type) : 0;
const size_t kvm_size =
sizeof(key_type) + sizeof(value_type) * dim() + score_size;
const size_t block_size =
std::min(shared_mem_size_ / 2 / kvm_size, 1024UL);
MERLIN_CHECK(
block_size > 0,
"[HierarchicalKV] block_size <= 0, the K-V-S size may be too large!");

dump_kernel<key_type, value_type, score_type, PredFunctor>
<<<grid_size, block_size, shared_size, stream>>>(
d_table_, table_->buckets, pattern, threshold, keys, values, scores,
offset, n, d_counter);
const size_t shared_size = kvm_size * block_size;
const size_t grid_size = SAFE_GET_GRID_SIZE(n, block_size);
dump_kernel<key_type, value_type, score_type, PredFunctor>
<<<grid_size, block_size, shared_size, stream>>>(
d_table_, table_->buckets, pattern, threshold, keys, values,
scores, offset, n, d_counter);
}

CudaCheckError();
}
Expand Down Expand Up @@ -2668,6 +2724,28 @@ class HashTable : public HashTableBase<K, V, S> {
return h_size;
}

/**
* @brief Returns the number of keys if meet PredFunctor.
*
* @param stream The CUDA stream that is used to execute the operation.
* @return The table size match condiction of PredFunctor.
*/
template <template <typename, typename> class PredFunctor>
void size_if(const key_type& pattern, const score_type& threshold,
size_type* d_counter, cudaStream_t stream = 0) const {
read_shared_lock lock(mutex_, stream);
CUDA_CHECK(cudaMemsetAsync(d_counter, 0, sizeof(size_type), stream));

size_t grid_size = SAFE_GET_GRID_SIZE(capacity(), options_.block_size);
grid_size = std::min(grid_size,
static_cast<size_t>(sm_cnt_ * max_threads_per_block_ /
options_.block_size));
size_if_kernel<key_type, value_type, score_type, PredFunctor>
<<<grid_size, options_.block_size, 0, stream>>>(
d_table_, table_->buckets, pattern, threshold, d_counter);
CudaCheckError();
}

/**
* @brief Returns the hash table capacity.
*
Expand Down Expand Up @@ -3037,6 +3115,8 @@ class HashTable : public HashTableBase<K, V, S> {
TableCore* table_ = nullptr;
TableCore* d_table_ = nullptr;
size_t shared_mem_size_ = 0;
int sm_cnt_ = 0;
int max_threads_per_block_ = 0;
std::atomic_bool reach_max_capacity_{false};
bool initialized_ = false;
mutable group_shared_mutex mutex_;
Expand Down
Loading

0 comments on commit 9b79bc9

Please sign in to comment.