From 6101f96f533e4d15f9b136c2dccf622b99963044 Mon Sep 17 00:00:00 2001 From: rhdong Date: Wed, 8 Feb 2023 16:18:55 +0800 Subject: [PATCH] [Fix] Evict error when bucket is full - Switch the `metas` to Atomic type - Add more UT cases for `meta` mechanism - Update benchmark --- README.md | 49 +- include/merlin/core_kernels.cuh | 215 +++++++-- include/merlin/types.cuh | 32 +- tests/merlin_hashtable_test.cc.cu | 742 +++++++++++++++++++++++++++++- tests/test_util.cuh | 10 + 5 files changed, 951 insertions(+), 97 deletions(-) diff --git a/README.md b/README.md index 8db0e2fe0..81a12c54e 100644 --- a/README.md +++ b/README.md @@ -91,35 +91,36 @@ Your environment must meet the following requirements: | dim | capacity | load_factor | HBM(GB) | HMEM(GB) | insert | find | erase | |----:|------------:|------------:|--------:|---------:|-------:|-------:|-------:| -| 4 | 67108864 | 0.50 | 32 | 0 | 1.756 | 3.043 | 4.157 | -| 4 | 67108864 | 0.75 | 32 | 0 | 1.036 | 1.998 | 2.867 | -| 4 | 67108864 | 1.00 | 32 | 0 | 0.919 | 1.132 | 0.558 | -| 16 | 67108864 | 0.50 | 16 | 0 | 1.397 | 2.220 | 4.335 | -| 16 | 67108864 | 0.75 | 16 | 0 | 1.012 | 1.393 | 2.940 | -| 16 | 67108864 | 1.00 | 16 | 0 | 0.924 | 1.135 | 0.559 | -| 64 | 67108864 | 0.50 | 16 | 0 | 0.736 | 0.939 | 4.276 | -| 64 | 67108864 | 0.75 | 16 | 0 | 0.716 | 0.688 | 2.911 | -| 64 | 67108864 | 1.00 | 16 | 0 | 0.922 | 1.135 | 0.561 | -| 128 | 134217728 | 0.50 | 64 | 0 | 0.406 | 0.559 | 3.821 | -| 128 | 134217728 | 0.75 | 64 | 0 | 0.565 | 0.435 | 2.708 | -| 128 | 134217728 | 1.00 | 64 | 0 | 0.824 | 1.114 | 0.549 | +| 4 | 67108864 | 0.50 | 32 | 0 | 1.620 | 3.156 | 4.026 | +| 4 | 67108864 | 0.75 | 32 | 0 | 1.052 | 2.159 | 2.901 | +| 4 | 67108864 | 1.00 | 32 | 0 | 0.225 | 0.891 | 0.805 | +| 16 | 67108864 | 0.50 | 16 | 0 | 1.402 | 2.237 | 4.219 | +| 16 | 67108864 | 0.75 | 16 | 0 | 0.999 | 1.636 | 2.863 | +| 16 | 67108864 | 1.00 | 16 | 0 | 0.224 | 0.856 | 0.819 | +| 64 | 67108864 | 0.50 | 16 | 0 | 0.686 | 0.859 | 4.200 | +| 64 | 67108864 | 0.75 | 16 | 0 | 0.637 | 0.705 | 2.911 | +| 64 | 67108864 | 1.00 | 16 | 0 | 0.205 | 0.718 | 0.804 | +| 128 | 134217728 | 0.50 | 64 | 0 | 0.403 | 0.472 | 3.783 | +| 128 | 134217728 | 0.75 | 64 | 0 | 0.445 | 0.425 | 2.684 | +| 128 | 134217728 | 1.00 | 64 | 0 | 0.183 | 0.557 | 0.766 | ### On HBM+HMEM hybrid mode: | dim | capacity | load_factor | HBM(GB) | HMEM(GB) | insert | find | erase | |----:|------------:|------------:|--------:|---------:|-------:|-------:|-------:| -| 64 | 134217728 | 0.50 | 16 | 16 | 0.111 | 0.112 | 3.659 | -| 64 | 134217728 | 0.75 | 16 | 16 | 0.110 | 0.110 | 2.574 | -| 64 | 134217728 | 1.00 | 16 | 16 | 0.338 | 0.293 | 0.543 | -| 64 | 1073741824 | 0.50 | 56 | 200 | 0.038 | 0.041 | 2.372 | -| 64 | 1073741824 | 0.75 | 56 | 200 | 0.036 | 0.041 | 2.056 | -| 64 | 1073741824 | 1.00 | 56 | 200 | 0.295 | 0.274 | 0.533 | -| 128 | 67108864 | 0.50 | 16 | 16 | 0.063 | 0.068 | 4.062 | -| 128 | 67108864 | 0.75 | 16 | 16 | 0.065 | 0.067 | 2.765 | -| 128 | 67108864 | 1.00 | 16 | 16 | 0.316 | 0.260 | 0.547 | -| 128 | 536870912 | 0.50 | 56 | 200 | 0.037 | 0.041 | 2.789 | -| 128 | 536870912 | 0.75 | 56 | 200 | 0.037 | 0.041 | 2.175 | -| 128 | 536870912 | 1.00 | 56 | 200 | 0.290 | 0.255 | 0.534 | +| 64 | 134217728 | 0.50 | 16 | 16 | 0.105 | 0.125 | 3.620 | +| 64 | 134217728 | 0.75 | 16 | 16 | 0.107 | 0.123 | 2.610 | +| 64 | 134217728 | 1.00 | 16 | 16 | 0.073 | 0.112 | 0.770 | +| 64 | 1073741824 | 0.50 | 56 | 200 | 0.037 | 0.042 | 2.362 | +| 64 | 1073741824 | 0.75 | 56 | 200 | 0.037 | 0.042 | 2.106 | +| 64 | 1073741824 | 1.00 | 56 | 200 | 0.031 | 0.040 | 0.727 | +| 128 | 67108864 | 0.50 | 16 | 16 | 0.064 | 0.072 | 4.039 | +| 128 | 67108864 | 0.75 | 16 | 16 | 0.069 | 0.071 | 2.834 | +| 128 | 67108864 | 1.00 | 16 | 16 | 0.052 | 0.067 | 0.805 | +| 128 | 536870912 | 0.50 | 56 | 200 | 0.037 | 0.042 | 2.866 | +| 128 | 536870912 | 0.75 | 56 | 200 | 0.038 | 0.043 | 2.225 | +| 128 | 536870912 | 1.00 | 56 | 200 | 0.033 | 0.041 | 0.734 | + ### Support and Feedback: diff --git a/include/merlin/core_kernels.cuh b/include/merlin/core_kernels.cuh index 7d96e398f..c811614e3 100644 --- a/include/merlin/core_kernels.cuh +++ b/include/merlin/core_kernels.cuh @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -92,6 +93,24 @@ __global__ void create_atomic_keys(Bucket* __restrict buckets, } } +template +__global__ void create_atomic_metas(Bucket* __restrict buckets, + const size_t start, const size_t end, + const size_t bucket_max_size) { + size_t tid = (blockIdx.x * blockDim.x) + threadIdx.x; + if (start + tid < end) { + for (size_t i = 0; i < bucket_max_size; i++) { + new (&(buckets[start + tid].metas[i])) + AtomicMeta{static_cast(EMPTY_META)}; + } + new (&(buckets[start + tid].cur_meta)) + AtomicMeta{static_cast(EMPTY_META)}; + new (&(buckets[start + tid].min_meta)) + AtomicMeta{static_cast(EMPTY_META)}; + new (&(buckets[start + tid].min_pos)) AtomicPos{1}; + } +} + /* 2GB per slice by default.*/ constexpr size_t kDefaultBytesPerSlice = (8ul << 30); @@ -150,7 +169,7 @@ void initialize_buckets(Table** table, const size_t start, CUDA_CHECK(cudaMalloc(&((*table)->buckets[i].keys), (*table)->bucket_max_size * sizeof(AtomicKey))); CUDA_CHECK(cudaMalloc(&((*table)->buckets[i].metas), - (*table)->bucket_max_size * sizeof(Meta))); + (*table)->bucket_max_size * sizeof(AtomicMeta))); } { @@ -167,6 +186,14 @@ void initialize_buckets(Table** table, const size_t start, create_atomic_keys<<>>( (*table)->buckets, start, end, (*table)->bucket_max_size); } + + { + const size_t block_size = 512; + const size_t N = end - start + 1; + const int grid_size = SAFE_GET_GRID_SIZE(N, block_size); + create_atomic_metas<<>>( + (*table)->buckets, start, end, (*table)->bucket_max_size); + } CudaCheckError(); } @@ -292,7 +319,7 @@ __forceinline__ __device__ void defragmentation_for_rehash( while (i < bucket_max_size) { key_idx = (remove_pos + i) & (bucket_max_size - 1); find_key = bucket->keys[key_idx].load(cuda::std::memory_order_relaxed); - if (find_key == EMPTY_KEY) { + if (find_key == static_cast(EMPTY_KEY)) { break; } hashed_key = Murmur3HashDevice(find_key); @@ -305,13 +332,16 @@ __forceinline__ __device__ void defragmentation_for_rehash( const K key = (*(bucket->keys + key_idx)).load(cuda::std::memory_order_relaxed); (*(bucket->keys + empty_pos)).store(key, cuda::std::memory_order_relaxed); - bucket->metas[empty_pos].val = bucket->metas[key_idx].val; + const M meta = + (*(bucket->metas + key_idx)).load(cuda::std::memory_order_relaxed); + (*(bucket->metas + empty_pos)) + .store(meta, cuda::std::memory_order_relaxed); for (int j = 0; j < dim; j++) { bucket->vectors[empty_pos * dim + j] = bucket->vectors[key_idx * dim + j]; } (*(bucket->keys + key_idx)) - .store(EMPTY_KEY, cuda::std::memory_order_relaxed); + .store(static_cast(EMPTY_KEY), cuda::std::memory_order_relaxed); empty_pos = key_idx; remove_pos = key_idx; i = 1; @@ -330,18 +360,20 @@ __forceinline__ __device__ void refresh_bucket_meta( for (int i = g.thread_rank(); i < bucket_max_size; i += TILE_SIZE) { const K key = bucket->keys[i].load(cuda::std::memory_order_relaxed); - if (key == EMPTY_KEY) { + if (key == static_cast(EMPTY_KEY) || + key == static_cast(RECLAIM_KEY)) { continue; } - if (bucket->metas[i].val < min_val) { + const M meta = bucket->metas[i].load(cuda::std::memory_order_acquire); + if (meta < min_val) { min_pos = i; - min_val = bucket->metas[i].val; + min_val = meta; } } M global_min_val = cg::reduce(g, min_val, cg::less()); if (min_val == global_min_val) { - bucket->min_pos = min_pos; - bucket->min_meta = min_val; + bucket->min_pos.store(min_pos, cuda::std::memory_order_release); + bucket->min_meta.store(min_val, cuda::std::memory_order_release); } } @@ -352,6 +384,12 @@ __device__ __forceinline__ void copy_vector( for (auto i = g.thread_rank(); i < dim; i += g.size()) { dst[i] = src[i]; } + + // cuda::barrier bar; + // init(&bar, 1); + // cuda::memcpy_async(g, dst, src, dim * sizeof(V), bar); + // + // bar.arrive_and_wait(); } /* Write the N data from src to each address in *dst by using CPU threads, @@ -412,7 +450,7 @@ __forceinline__ __device__ void move_key_to_new_bucket( (new_start_idx + tile_offset + rank) & (bucket_max_size - 1); const K current_key = (*(new_bucket->keys + key_offset)) .load(cuda::std::memory_order_relaxed); - empty_vote = g.ballot(current_key == EMPTY_KEY); + empty_vote = g.ballot(current_key == static_cast(EMPTY_KEY)); if (empty_vote) { src_lane = __ffs(empty_vote) - 1; key_pos = @@ -420,7 +458,7 @@ __forceinline__ __device__ void move_key_to_new_bucket( local_size = buckets_size[new_bkt_idx]; if (rank == src_lane) { new_bucket->keys[key_pos].store(key, cuda::std::memory_order_relaxed); - new_bucket->metas[key_pos].val = meta; + new_bucket->metas[key_pos].store(meta, cuda::std::memory_order_relaxed); atomicAdd(&(buckets_size[new_bkt_idx]), 1); } local_size = g.shfl(local_size, src_lane); @@ -449,6 +487,7 @@ __global__ void rehash_kernel_for_fast_mode( size_t global_idx; uint32_t start_idx = 0; K target_key = 0; + M target_meta = 0; for (size_t t = tid; t < N; t += blockDim.x * gridDim.x) { uint32_t bkt_idx = t / TILE_SIZE; @@ -459,19 +498,22 @@ __global__ void rehash_kernel_for_fast_mode( while (key_idx < bucket_max_size) { key_idx = g.shfl(key_idx, 0); target_key = bucket->keys[key_idx].load(cuda::std::memory_order_relaxed); - if (target_key != EMPTY_KEY && target_key != RECLAIM_KEY) { + target_meta = + bucket->metas[key_idx].load(cuda::std::memory_order_relaxed); + if (target_key != static_cast(EMPTY_KEY) && + target_key != static_cast(RECLAIM_KEY)) { K hashed_key = Murmur3HashDevice(target_key); global_idx = hashed_key & (buckets_num * bucket_max_size - 1); uint32_t new_bkt_idx = global_idx / bucket_max_size; if (new_bkt_idx != bkt_idx) { start_idx = global_idx % bucket_max_size; move_key_to_new_bucket( - g, rank, target_key, bucket->metas[key_idx].val, + g, rank, target_key, target_meta, (bucket->vectors + key_idx * dim), buckets + new_bkt_idx, new_bkt_idx, start_idx, buckets_size, bucket_max_size, buckets_num, table->dim); if (rank == 0) { - bucket->keys[key_idx].store(EMPTY_KEY, + bucket->keys[key_idx].store(static_cast(EMPTY_KEY), cuda::std::memory_order_relaxed); atomicSub(&(buckets_size[bkt_idx]), 1); defragmentation_for_rehash( @@ -669,7 +711,7 @@ __device__ __forceinline__ unsigned find_in_bucket( return found_vote; } - if (g.any(current_key == EMPTY_KEY)) { + if (g.any(current_key == static_cast(EMPTY_KEY))) { return 0; } } @@ -714,17 +756,43 @@ __forceinline__ __device__ void update_meta(Bucket* __restrict bucket, const int key_pos, const M* __restrict metas, const int key_idx) { - if (bucket->metas == nullptr) return; if (metas == nullptr) { - M cur_meta = bucket->cur_meta + 1; - bucket->cur_meta = cur_meta; - bucket->metas[key_pos].val = cur_meta; + M cur_meta = bucket->cur_meta.fetch_add(1) + 1; + bucket->metas[key_pos].store(cur_meta, cuda::std::memory_order_relaxed); } else { - bucket->metas[key_pos].val = metas[key_idx]; + bucket->metas[key_pos].store(metas[key_idx], + cuda::std::memory_order_relaxed); } return; } +template +__forceinline__ __device__ OverrideResult try_override_min_meta( + Bucket* __restrict bucket, const int key_pos, M min_meta, + const M* __restrict metas, const int key_idx) { + if (metas == nullptr) { + const M cur_meta = bucket->cur_meta.load(cuda::std::memory_order_relaxed); + if (bucket->metas[key_pos].compare_exchange_strong( + min_meta, cur_meta + 1, cuda::std::memory_order_relaxed)) { + bucket->cur_meta.fetch_add(1, cuda::std::memory_order_relaxed); + return OverrideResult::SUCCESS; + } else { + return OverrideResult::CONTINUE; + } + } else { + if (metas[key_idx] < min_meta) { + return OverrideResult::REFUSED; + } + if (bucket->metas[key_pos].compare_exchange_strong( + min_meta, metas[key_idx], cuda::std::memory_order_relaxed)) { + return OverrideResult::SUCCESS; + } else { + return OverrideResult::CONTINUE; + } + } + return OverrideResult::CONTINUE; +} + template __forceinline__ __device__ void upsert_kernel_with_io_core( const Table* __restrict table, const K* __restrict keys, @@ -757,9 +825,10 @@ __forceinline__ __device__ void upsert_kernel_with_io_core( bucket = get_key_position(buckets, insert_key, bkt_idx, start_idx, buckets_num, bucket_max_size); + local_size = buckets_size[bkt_idx]; + found_vote = find_in_bucket( g, bucket->keys, insert_key, tile_offset, start_idx, bucket_max_size); - if (found_vote) { src_lane = __ffs(found_vote) - 1; key_pos = (start_idx + tile_offset + src_lane) & (bucket_max_size - 1); @@ -778,7 +847,6 @@ __forceinline__ __device__ void upsert_kernel_with_io_core( } tile_offset = 0; - local_size = buckets_size[bkt_idx]; OccupyResult occupy_result{OccupyResult::INITIAL}; while (tile_offset < bucket_max_size && local_size < bucket_max_size) { @@ -827,9 +895,9 @@ __forceinline__ __device__ void upsert_kernel_with_io_core( (start_idx + tile_offset + src_lane) & (bucket_max_size - 1); if (rank == src_lane) { update_meta(bucket, key_pos, metas, key_idx); - atomicAdd(&(buckets_size[bkt_idx]), 1); + local_size = atomicAdd(&(buckets_size[bkt_idx]), 1) + 1; } - local_size++; + local_size = g.shfl(local_size, src_lane); if (local_size >= bucket_max_size) { refresh_bucket_meta(g, bucket, bucket_max_size); @@ -848,20 +916,39 @@ __forceinline__ __device__ void upsert_kernel_with_io_core( tile_offset += TILE_SIZE; } - if (occupy_result == OccupyResult::CONTINUE) { - src_lane = (bucket->min_pos % TILE_SIZE); - key_pos = bucket->min_pos; + while (occupy_result == OccupyResult::CONTINUE || + occupy_result == OccupyResult::INITIAL) { + key_pos = bucket->min_pos.load(cuda::std::memory_order_acquire); + src_lane = (key_pos % TILE_SIZE); + OverrideResult override_result{OverrideResult::INITIAL}; + + if (rank == src_lane) { + const M min_meta = + bucket->min_meta.load(cuda::std::memory_order_acquire); + override_result = + try_override_min_meta(bucket, key_pos, min_meta, metas, key_idx); + } + override_result = g.shfl(override_result, src_lane); + if (override_result == OverrideResult::REFUSED) break; + + if (override_result == OverrideResult::CONTINUE) { + refresh_bucket_meta(g, bucket, bucket_max_size); + continue; + } + // override_result == OverrideResult::SUCCESS if (rank == src_lane) { bucket->keys[key_pos].store(insert_key, cuda::std::memory_order_relaxed); - update_meta(bucket, key_pos, metas, key_idx); } + refresh_bucket_meta(g, bucket, bucket_max_size); + lock(g, table->locks[bkt_idx]); copy_vector(g, insert_value, bucket->vectors + key_pos * dim, dim); unlock(g, table->locks[bkt_idx]); + break; } } } @@ -1040,10 +1127,11 @@ __global__ void upsert_kernel(const Table* __restrict table, (start_idx + tile_offset + src_lane) & (bucket_max_size - 1); if (rank == src_lane) { update_meta(bucket, key_pos, metas, key_idx); - atomicAdd(&(buckets_size[bkt_idx]), 1); + local_size = atomicAdd(&(buckets_size[bkt_idx]), 1) + 1; *(vectors + key_idx) = (bucket->vectors + key_pos * dim); } - local_size++; + local_size = g.shfl(local_size, src_lane); + ; if (local_size >= bucket_max_size) { refresh_bucket_meta(g, bucket, bucket_max_size); @@ -1058,17 +1146,35 @@ __global__ void upsert_kernel(const Table* __restrict table, tile_offset += TILE_SIZE; } - if (occupy_result == OccupyResult::CONTINUE) { - src_lane = (bucket->min_pos % TILE_SIZE); - key_pos = bucket->min_pos; + while (occupy_result == OccupyResult::CONTINUE || + occupy_result == OccupyResult::INITIAL) { + key_pos = bucket->min_pos.load(cuda::std::memory_order_acquire); + src_lane = (key_pos % TILE_SIZE); + OverrideResult override_result{OverrideResult::INITIAL}; + + if (rank == src_lane) { + const M min_meta = + bucket->min_meta.load(cuda::std::memory_order_acquire); + override_result = + try_override_min_meta(bucket, key_pos, min_meta, metas, key_idx); + } + override_result = g.shfl(override_result, src_lane); + if (override_result == OverrideResult::REFUSED) break; + + if (override_result == OverrideResult::CONTINUE) { + refresh_bucket_meta(g, bucket, bucket_max_size); + continue; + } + // override_result == OverrideResult::SUCCESS if (rank == src_lane) { bucket->keys[key_pos].store(insert_key, cuda::std::memory_order_relaxed); *(vectors + key_idx) = (bucket->vectors + key_pos * dim); - update_meta(bucket, key_pos, metas, key_idx); } + refresh_bucket_meta(g, bucket, bucket_max_size); + break; } } } @@ -1116,8 +1222,8 @@ __global__ void accum_kernel( (start_idx + tile_offset + rank) & (bucket_max_size - 1); K current_key = bucket->keys[key_offset].load(cuda::std::memory_order_relaxed); - found_or_empty_vote = - g.ballot(current_key == EMPTY_KEY || insert_key == current_key); + found_or_empty_vote = g.ballot(current_key == static_cast(EMPTY_KEY) || + insert_key == current_key); if (found_or_empty_vote) { src_lane = __ffs(found_or_empty_vote) - 1; key_pos = (start_idx + tile_offset + src_lane) & (bucket_max_size - 1); @@ -1203,7 +1309,8 @@ __forceinline__ __device__ void lookup_kernel_with_io_core( if (rank == 0) { if (metas != nullptr) { - *(metas + key_idx) = bucket->metas[key_pos].val; + *(metas + key_idx) = + bucket->metas[key_pos].load(cuda::std::memory_order_relaxed); } if (found != nullptr) { *(found + key_idx) = true; @@ -1315,7 +1422,8 @@ __global__ void lookup_kernel(const Table* __restrict table, *(vectors + key_idx) = (bucket->vectors + key_pos * dim); if (metas != nullptr) { - *(metas + key_idx) = bucket->metas[key_pos].val; + *(metas + key_idx) = + bucket->metas[key_pos].load(cuda::std::memory_order_relaxed); } if (found != nullptr) { *(found + key_idx) = true; @@ -1340,7 +1448,8 @@ __global__ void clear_kernel(Table* __restrict table, size_t N) { int bkt_idx = t / bucket_max_size; Bucket* bucket = &(table->buckets[bkt_idx]); - bucket->keys[key_idx].store(EMPTY_KEY, cuda::std::memory_order_relaxed); + bucket->keys[key_idx].store(static_cast(EMPTY_KEY), + cuda::std::memory_order_relaxed); if (key_idx == 0) { table->buckets_size[bkt_idx] = 0; } @@ -1385,7 +1494,7 @@ __global__ void remove_kernel(const Table* __restrict table, break; } - if (g.any(current_key == EMPTY_KEY)) { + if (g.any(current_key == static_cast(EMPTY_KEY))) { break; } } @@ -1397,7 +1506,8 @@ __global__ void remove_kernel(const Table* __restrict table, const int key_pos = (start_idx + tile_offset + src_lane) & (bucket_max_size - 1); (*(bucket->keys + key_pos)) - .store(RECLAIM_KEY, cuda::std::memory_order_relaxed); + .store(static_cast(RECLAIM_KEY), + cuda::std::memory_order_relaxed); atomicSub(&buckets_size[bkt_idx], 1); } break; @@ -1425,16 +1535,18 @@ __global__ void remove_kernel(const Table* __restrict table, Bucket* bucket = buckets + bkt_idx; K current_key = 0; + M current_meta = 0; uint32_t key_offset = 0; while (key_offset < bucket_max_size) { current_key = bucket->keys[key_offset].load(cuda::std::memory_order_relaxed); - if (current_key != EMPTY_KEY) { - if (pred(current_key, bucket->metas[key_offset].val, pattern, - threshold)) { + current_meta = + bucket->metas[key_offset].load(cuda::std::memory_order_relaxed); + if (current_key != static_cast(EMPTY_KEY)) { + if (pred(current_key, current_meta, pattern, threshold)) { atomicAdd(count, 1); key_pos = key_offset; - bucket->keys[key_pos].store(RECLAIM_KEY, + bucket->keys[key_pos].store(static_cast(RECLAIM_KEY), cuda::std::memory_order_relaxed); atomicSub(&buckets_size[bkt_idx], 1); } else { @@ -1476,7 +1588,8 @@ __global__ void dump_kernel(const Table* __restrict table, K* d_key, Bucket* bucket = &(table->buckets[bkt_idx]); const K key = bucket->keys[key_idx].load(cuda::std::memory_order_relaxed); - if (key != EMPTY_KEY && key != RECLAIM_KEY) { + if (key != static_cast(EMPTY_KEY) && + key != static_cast(RECLAIM_KEY)) { size_t local_index = atomicAdd(&block_acc, 1); block_result_key[local_index] = key; for (int i = 0; i < dim; i++) { @@ -1484,7 +1597,8 @@ __global__ void dump_kernel(const Table* __restrict table, K* d_key, bucket->vectors[key_idx * dim + i]; } if (d_meta != nullptr) { - block_result_meta[local_index] = bucket->metas[key_idx].val; + block_result_meta[local_index] = + bucket->metas[key_idx].load(cuda::std::memory_order_relaxed); } } } @@ -1538,9 +1652,10 @@ __global__ void dump_kernel(const Table* __restrict table, Bucket* bucket = &(table->buckets[bkt_idx]); const K key = bucket->keys[key_idx].load(cuda::std::memory_order_relaxed); - M meta = bucket->metas[key_idx].val; + M meta = bucket->metas[key_idx].load(cuda::std::memory_order_relaxed); - if (key != EMPTY_KEY && pred(key, meta, pattern, threshold)) { + if (key != static_cast(EMPTY_KEY) && + pred(key, meta, pattern, threshold)) { size_t local_index = atomicAdd(&block_acc, 1); block_result_key[local_index] = key; for (int i = 0; i < dim; i++) { @@ -1548,7 +1663,7 @@ __global__ void dump_kernel(const Table* __restrict table, bucket->vectors[key_idx * dim + i]); } if (d_meta != nullptr) { - block_result_meta[local_index] = bucket->metas[key_idx].val; + block_result_meta[local_index] = meta; } } } diff --git a/include/merlin/types.cuh b/include/merlin/types.cuh index 274275368..06fcc0203 100644 --- a/include/merlin/types.cuh +++ b/include/merlin/types.cuh @@ -23,11 +23,6 @@ namespace nv { namespace merlin { -template -struct Meta { - M val; -}; - constexpr uint64_t EMPTY_KEY = UINT64_C(0xFFFFFFFFFFFFFFFF); constexpr uint64_t RECLAIM_KEY = UINT64_C(0xFFFFFFFFFFFFFFFE); constexpr uint64_t MAX_META = UINT64_C(0xFFFFFFFFFFFFFFFF); @@ -36,23 +31,29 @@ constexpr uint64_t EMPTY_META = UINT64_C(0); template using AtomicKey = cuda::atomic; +template +using AtomicMeta = cuda::atomic; + +template +using AtomicPos = cuda::atomic; + template struct Bucket { - AtomicKey* keys; // HBM - Meta* metas; // HBM - V* cache; // HBM(optional) - V* vectors; // Pinned memory or HBM + AtomicKey* keys; // HBM + AtomicMeta* metas; // HBM + V* cache; // HBM(optional) + V* vectors; // Pinned memory or HBM /* For upsert_kernel without user specified metas recording the current meta, the cur_meta will increment by 1 when a new inserting happens. */ - M cur_meta; + AtomicMeta cur_meta; /* min_meta and min_pos is for or upsert_kernel with user specified meta. They record the minimum meta and its pos in the bucket. */ - M min_meta; - int min_pos; + AtomicMeta min_meta; + AtomicPos min_pos; }; template @@ -180,5 +181,12 @@ enum class OccupyResult { DUPLICATE, ///< Insert did not succeed, key is already present, }; +enum class OverrideResult { + INITIAL, ///< Initial status + CONTINUE, ///< Override did not succeed, continue trying to override + SUCCESS, ///< Override successfully + REFUSED, ///< Override is refused. +}; + } // namespace merlin } // namespace nv diff --git a/tests/merlin_hashtable_test.cc.cu b/tests/merlin_hashtable_test.cc.cu index 1c23d23e6..c6c74582e 100644 --- a/tests/merlin_hashtable_test.cc.cu +++ b/tests/merlin_hashtable_test.cc.cu @@ -19,12 +19,14 @@ #include #include #include +#include #include #include #include #include #include #include "merlin_hashtable.cuh" +#include "test_util.cuh" uint64_t getTimestamp() { return std::chrono::duration_cast( @@ -59,11 +61,17 @@ void create_random_keys(K* h_keys, M* h_metas, V* h_vectors, int KEY_NUM) { } } -template -void create_continuous_keys(K* h_keys, M* h_metas, int KEY_NUM, K start = 0) { +template +void create_continuous_keys(K* h_keys, M* h_metas, V* h_vectors, int KEY_NUM, + K start = 1) { for (K i = 0; i < KEY_NUM; i++) { h_keys[i] = start + static_cast(i); - h_metas[i] = getTimestamp(); + h_metas[i] = h_keys[i]; + if (h_vectors != nullptr) { + for (size_t j = 0; j < DIM; j++) { + h_vectors[i * DIM + j] = static_cast(h_keys[i] * 0.00001); + } + } } } @@ -80,7 +88,9 @@ inline uint64_t Murmur3HashHost(const uint64_t& key) { template void create_keys_in_one_buckets(K* h_keys, M* h_metas, V* h_vectors, int KEY_NUM, int capacity, - int bucket_max_size = 128, int bucket_idx = 0) { + int bucket_max_size = 128, int bucket_idx = 0, + K min = 0, + K max = static_cast(0xFFFFFFFFFFFFFFFD)) { std::unordered_set numbers; std::random_device rd; std::mt19937_64 eng(rd()); @@ -92,7 +102,7 @@ void create_keys_in_one_buckets(K* h_keys, M* h_metas, V* h_vectors, int i = 0; while (numbers.size() < KEY_NUM) { - candidate = distr(eng) % 100000; + candidate = (distr(eng) % (max - min)) + min; hashed_key = Murmur3HashHost(candidate); global_idx = hashed_key & (capacity - 1); bkt_idx = global_idx / bucket_max_size; @@ -489,9 +499,8 @@ void test_erase_if_pred(size_t max_hbm_for_vectors, bool use_constant_memory) { uint64_t total_size = 0; for (int i = 0; i < TEST_TIMES; i++) { - create_keys_in_one_buckets(h_keys, h_metas, - reinterpret_cast(h_vectors), - KEY_NUM, INIT_CAPACITY); + create_keys_in_one_buckets(h_keys, h_metas, h_vectors, KEY_NUM, + INIT_CAPACITY); CUDA_CHECK(cudaMemcpy(d_keys, h_keys, KEY_NUM * sizeof(K), cudaMemcpyHostToDevice)); CUDA_CHECK(cudaMemcpy(d_metas, h_metas, KEY_NUM * sizeof(M), @@ -614,9 +623,8 @@ void test_rehash(size_t max_hbm_for_vectors, bool use_constant_memory) { for (int i = 0; i < TEST_TIMES; i++) { std::unique_ptr table = std::make_unique
(); table->init(options); - create_keys_in_one_buckets( - h_keys, h_metas, reinterpret_cast(h_vectors), KEY_NUM, - INIT_CAPACITY, BUCKET_MAX_SIZE); + create_keys_in_one_buckets(h_keys, h_metas, h_vectors, KEY_NUM, + INIT_CAPACITY, BUCKET_MAX_SIZE); CUDA_CHECK(cudaMemcpy(d_keys, h_keys, KEY_NUM * sizeof(K), cudaMemcpyHostToDevice)); CUDA_CHECK(cudaMemcpy(d_metas, h_metas, KEY_NUM * sizeof(M), @@ -1252,6 +1260,690 @@ void test_basic_for_cpu_io(bool use_constant_memory) { CudaCheckError(); } +void test_evict_strategy_lru_basic(size_t max_hbm_for_vectors, + bool use_constant_memory) { + constexpr uint64_t BUCKET_NUM = 8UL; + constexpr uint64_t BUCKET_MAX_SIZE = 128UL; + constexpr uint64_t INIT_CAPACITY = BUCKET_NUM * BUCKET_MAX_SIZE; // 1024UL; + constexpr uint64_t MAX_CAPACITY = INIT_CAPACITY; + constexpr uint64_t BASE_KEY_NUM = BUCKET_MAX_SIZE; + constexpr uint64_t TEST_KEY_NUM = 4; + constexpr uint64_t TEMP_KEY_NUM = std::max(BASE_KEY_NUM, TEST_KEY_NUM); + constexpr uint64_t TEST_TIMES = 128; + + TableOptions options; + + options.init_capacity = INIT_CAPACITY; + options.max_capacity = MAX_CAPACITY; + options.dim = DIM; + options.max_hbm_for_vectors = nv::merlin::GB(max_hbm_for_vectors); + options.evict_strategy = nv::merlin::EvictStrategy::kLru; + options.use_constant_memory = use_constant_memory; + + std::array h_keys_base; + std::array h_metas_base; + std::array h_vectors_base; + + std::array h_keys_test; + std::array h_metas_test; + std::array h_vectors_test; + + std::array h_keys_temp; + std::array h_metas_temp; + std::array h_vectors_temp; + + K* d_keys_temp; + M* d_metas_temp = nullptr; + V* d_vectors_temp; + + CUDA_CHECK(cudaMalloc(&d_keys_temp, TEMP_KEY_NUM * sizeof(K))); + CUDA_CHECK(cudaMalloc(&d_metas_temp, TEMP_KEY_NUM * sizeof(M))); + CUDA_CHECK( + cudaMalloc(&d_vectors_temp, TEMP_KEY_NUM * sizeof(V) * options.dim)); + + create_keys_in_one_buckets( + h_keys_base.data(), h_metas_base.data(), h_vectors_base.data(), + BASE_KEY_NUM, INIT_CAPACITY, BUCKET_MAX_SIZE, 1, 0, 0x3FFFFFFFFFFFFFFF); + + create_keys_in_one_buckets(h_keys_test.data(), h_metas_test.data(), + h_vectors_test.data(), TEST_KEY_NUM, + INIT_CAPACITY, BUCKET_MAX_SIZE, 1, + 0x3FFFFFFFFFFFFFFF, 0xFFFFFFFFFFFFFFFD); + + h_keys_test[2] = h_keys_base[72]; + h_keys_test[3] = h_keys_base[73]; + + for (int i = 0; i < options.dim; i++) { + h_vectors_test[2 * options.dim + i] = h_vectors_base[72 * options.dim + i]; + h_vectors_test[3 * options.dim + i] = h_vectors_base[73 * options.dim + i]; + } + cudaStream_t stream; + CUDA_CHECK(cudaStreamCreate(&stream)); + + size_t total_size = 0; + size_t dump_counter = 0; + for (int i = 0; i < TEST_TIMES; i++) { + std::unique_ptr
table = std::make_unique
(); + table->init(options); + + total_size = table->size(stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + ASSERT_EQ(total_size, 0); + + { + CUDA_CHECK(cudaMemcpy(d_keys_temp, h_keys_base.data(), + BASE_KEY_NUM * sizeof(K), cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy(d_metas_temp, h_metas_base.data(), + BASE_KEY_NUM * sizeof(M), cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy(d_vectors_temp, h_vectors_base.data(), + BASE_KEY_NUM * sizeof(V) * options.dim, + cudaMemcpyHostToDevice)); + table->insert_or_assign(BASE_KEY_NUM, d_keys_temp, d_vectors_temp, + nullptr, stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + + size_t total_size = table->size(stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + ASSERT_EQ(total_size, BUCKET_MAX_SIZE); + + dump_counter = table->export_batch(table->capacity(), 0, d_keys_temp, + d_vectors_temp, d_metas_temp, stream); + ASSERT_EQ(dump_counter, BUCKET_MAX_SIZE); + + CUDA_CHECK(cudaMemcpy(h_keys_temp.data(), d_keys_temp, + BASE_KEY_NUM * sizeof(K), cudaMemcpyDefault)); + CUDA_CHECK(cudaMemcpy(h_metas_temp.data(), d_metas_temp, + BASE_KEY_NUM * sizeof(M), cudaMemcpyDefault)); + CUDA_CHECK(cudaMemcpy(h_vectors_temp.data(), d_vectors_temp, + BASE_KEY_NUM * sizeof(V) * options.dim, + cudaMemcpyDefault)); + + std::array h_metas_temp_sorted(h_metas_temp); + std::sort(h_metas_temp_sorted.begin(), h_metas_temp_sorted.end()); + + ASSERT_TRUE( + (h_metas_temp_sorted == test_util::range(1))); + for (int i = 0; i < dump_counter; i++) { + for (int j = 0; j < options.dim; j++) { + ASSERT_EQ(h_vectors_temp[i * options.dim + j], + static_cast(h_keys_temp[i] * 0.00001)); + } + } + } + + { + CUDA_CHECK(cudaMemcpy(d_keys_temp, h_keys_test.data(), + TEST_KEY_NUM * sizeof(K), cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy(d_metas_temp, h_metas_test.data(), + TEST_KEY_NUM * sizeof(M), cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy(d_vectors_temp, h_vectors_test.data(), + TEST_KEY_NUM * sizeof(V) * options.dim, + cudaMemcpyHostToDevice)); + table->insert_or_assign(TEST_KEY_NUM, d_keys_temp, d_vectors_temp, + nullptr, stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + + size_t total_size = table->size(stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + ASSERT_EQ(total_size, BUCKET_MAX_SIZE); + + dump_counter = table->export_batch(table->capacity(), 0, d_keys_temp, + d_vectors_temp, d_metas_temp, stream); + ASSERT_EQ(dump_counter, BUCKET_MAX_SIZE); + + CUDA_CHECK(cudaMemcpy(h_keys_temp.data(), d_keys_temp, + TEMP_KEY_NUM * sizeof(K), cudaMemcpyDefault)); + CUDA_CHECK(cudaMemcpy(h_metas_temp.data(), d_metas_temp, + TEMP_KEY_NUM * sizeof(M), cudaMemcpyDefault)); + CUDA_CHECK(cudaMemcpy(h_vectors_temp.data(), d_vectors_temp, + TEMP_KEY_NUM * sizeof(V) * options.dim, + cudaMemcpyDefault)); + + std::array h_metas_temp_sorted; + int ctr = 0; + for (int i = 0; i < TEMP_KEY_NUM; i++) { + if (h_keys_test.end() != + std::find(h_keys_test.begin(), h_keys_test.end(), h_keys_temp[i])) { + ASSERT_GT(h_metas_temp[i], BUCKET_MAX_SIZE); + h_metas_temp_sorted[ctr++] = h_metas_temp[i]; + } else { + ASSERT_LE(h_metas_temp[i], BUCKET_MAX_SIZE); + } + } + std::sort(h_metas_temp_sorted.begin(), h_metas_temp_sorted.begin() + ctr); + + ASSERT_TRUE((h_metas_temp_sorted == + test_util::range(BUCKET_MAX_SIZE + 1))); + for (int i = 0; i < dump_counter; i++) { + for (int j = 0; j < options.dim; j++) { + ASSERT_EQ(h_vectors_temp[i * options.dim + j], + static_cast(h_keys_temp[i] * 0.00001)); + } + } + } + } + CUDA_CHECK(cudaStreamDestroy(stream)); + + CUDA_CHECK(cudaFree(d_keys_temp)); + CUDA_CHECK(cudaFree(d_metas_temp)); + CUDA_CHECK(cudaFree(d_vectors_temp)); + + CUDA_CHECK(cudaDeviceSynchronize()); + + CudaCheckError(); +} + +void test_evict_strategy_customized_basic(size_t max_hbm_for_vectors, + bool use_constant_memory) { + constexpr uint64_t BUCKET_NUM = 8UL; + constexpr uint64_t BUCKET_MAX_SIZE = 128UL; + constexpr uint64_t INIT_CAPACITY = BUCKET_NUM * BUCKET_MAX_SIZE; // 1024UL; + constexpr uint64_t MAX_CAPACITY = INIT_CAPACITY; + constexpr uint64_t BASE_KEY_NUM = BUCKET_MAX_SIZE; + constexpr uint64_t TEST_KEY_NUM = 128; + constexpr uint64_t TEMP_KEY_NUM = std::max(BASE_KEY_NUM, TEST_KEY_NUM); + constexpr uint64_t TEST_TIMES = 128; + + TableOptions options; + + options.init_capacity = INIT_CAPACITY; + options.max_capacity = MAX_CAPACITY; + options.dim = DIM; + options.max_hbm_for_vectors = nv::merlin::GB(max_hbm_for_vectors); + options.evict_strategy = nv::merlin::EvictStrategy::kCustomized; + options.use_constant_memory = use_constant_memory; + + std::array h_keys_base; + std::array h_metas_base; + std::array h_vectors_base; + + std::array h_keys_test; + std::array h_metas_test; + std::array h_vectors_test; + + std::array h_keys_temp; + std::array h_metas_temp; + std::array h_vectors_temp; + + K* d_keys_temp; + M* d_metas_temp = nullptr; + V* d_vectors_temp; + + CUDA_CHECK(cudaMalloc(&d_keys_temp, TEMP_KEY_NUM * sizeof(K))); + CUDA_CHECK(cudaMalloc(&d_metas_temp, TEMP_KEY_NUM * sizeof(M))); + CUDA_CHECK( + cudaMalloc(&d_vectors_temp, TEMP_KEY_NUM * sizeof(V) * options.dim)); + + create_keys_in_one_buckets( + h_keys_base.data(), h_metas_base.data(), h_vectors_base.data(), + BASE_KEY_NUM, INIT_CAPACITY, BUCKET_MAX_SIZE, 1, 0, 0x3FFFFFFFFFFFFFFF); + + const M base_meta_start = 1000; + for (int i = 0; i < BASE_KEY_NUM; i++) { + h_metas_base[i] = base_meta_start + i; + } + + create_keys_in_one_buckets(h_keys_test.data(), h_metas_test.data(), + h_vectors_test.data(), TEST_KEY_NUM, + INIT_CAPACITY, BUCKET_MAX_SIZE, 1, + 0x3FFFFFFFFFFFFFFF, 0xFFFFFFFFFFFFFFFD); + const M test_meta_start = base_meta_start + BASE_KEY_NUM; + for (int i = 0; i < TEST_KEY_NUM; i++) { + h_metas_test[i] = test_meta_start + i; + } + for (int i = 64; i < TEST_KEY_NUM; i++) { + h_keys_test[i] = h_keys_base[i]; + for (int j = 0; j < options.dim; j++) { + h_vectors_test[i * options.dim + j] = h_vectors_base[i * options.dim + j]; + } + } + + cudaStream_t stream; + CUDA_CHECK(cudaStreamCreate(&stream)); + + size_t total_size = 0; + size_t dump_counter = 0; + for (int i = 0; i < TEST_TIMES; i++) { + std::unique_ptr
table = std::make_unique
(); + table->init(options); + + total_size = table->size(stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + ASSERT_EQ(total_size, 0); + + { + CUDA_CHECK(cudaMemcpy(d_keys_temp, h_keys_base.data(), + BASE_KEY_NUM * sizeof(K), cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy(d_metas_temp, h_metas_base.data(), + BASE_KEY_NUM * sizeof(M), cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy(d_vectors_temp, h_vectors_base.data(), + BASE_KEY_NUM * sizeof(V) * options.dim, + cudaMemcpyHostToDevice)); + table->insert_or_assign(BASE_KEY_NUM, d_keys_temp, d_vectors_temp, + d_metas_temp, stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + + size_t total_size = table->size(stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + ASSERT_EQ(total_size, BUCKET_MAX_SIZE); + + dump_counter = table->export_batch(table->capacity(), 0, d_keys_temp, + d_vectors_temp, d_metas_temp, stream); + ASSERT_EQ(dump_counter, BUCKET_MAX_SIZE); + + CUDA_CHECK(cudaMemcpy(h_keys_temp.data(), d_keys_temp, + BASE_KEY_NUM * sizeof(K), cudaMemcpyDefault)); + CUDA_CHECK(cudaMemcpy(h_metas_temp.data(), d_metas_temp, + BASE_KEY_NUM * sizeof(M), cudaMemcpyDefault)); + CUDA_CHECK(cudaMemcpy(h_vectors_temp.data(), d_vectors_temp, + BASE_KEY_NUM * sizeof(V) * options.dim, + cudaMemcpyDefault)); + + std::array h_metas_temp_sorted(h_metas_temp); + std::sort(h_metas_temp_sorted.begin(), h_metas_temp_sorted.end()); + + ASSERT_TRUE((h_metas_temp_sorted == + test_util::range(base_meta_start))); + for (int i = 0; i < dump_counter; i++) { + for (int j = 0; j < options.dim; j++) { + ASSERT_EQ(h_vectors_temp[i * options.dim + j], + static_cast(h_keys_temp[i] * 0.00001)); + } + } + } + + { + CUDA_CHECK(cudaMemcpy(d_keys_temp, h_keys_test.data(), + TEST_KEY_NUM * sizeof(K), cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy(d_metas_temp, h_metas_test.data(), + TEST_KEY_NUM * sizeof(M), cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy(d_vectors_temp, h_vectors_test.data(), + TEST_KEY_NUM * sizeof(V) * options.dim, + cudaMemcpyHostToDevice)); + table->insert_or_assign(TEST_KEY_NUM, d_keys_temp, d_vectors_temp, + d_metas_temp, stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + + size_t total_size = table->size(stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + ASSERT_EQ(total_size, BUCKET_MAX_SIZE); + + dump_counter = table->export_batch(table->capacity(), 0, d_keys_temp, + d_vectors_temp, d_metas_temp, stream); + ASSERT_EQ(dump_counter, BUCKET_MAX_SIZE); + + CUDA_CHECK(cudaMemcpy(h_keys_temp.data(), d_keys_temp, + TEMP_KEY_NUM * sizeof(K), cudaMemcpyDefault)); + CUDA_CHECK(cudaMemcpy(h_metas_temp.data(), d_metas_temp, + TEMP_KEY_NUM * sizeof(M), cudaMemcpyDefault)); + CUDA_CHECK(cudaMemcpy(h_vectors_temp.data(), d_vectors_temp, + TEMP_KEY_NUM * sizeof(V) * options.dim, + cudaMemcpyDefault)); + + std::array h_metas_temp_sorted(h_metas_temp); + std::sort(h_metas_temp_sorted.begin(), h_metas_temp_sorted.end()); + + ASSERT_TRUE((h_metas_temp_sorted == + test_util::range(test_meta_start))); + for (int i = 0; i < dump_counter; i++) { + for (int j = 0; j < options.dim; j++) { + ASSERT_EQ(h_vectors_temp[i * options.dim + j], + static_cast(h_keys_temp[i] * 0.00001)); + } + } + } + } + CUDA_CHECK(cudaStreamDestroy(stream)); + + CUDA_CHECK(cudaFree(d_keys_temp)); + CUDA_CHECK(cudaFree(d_metas_temp)); + CUDA_CHECK(cudaFree(d_vectors_temp)); + + CUDA_CHECK(cudaDeviceSynchronize()); + + CudaCheckError(); +} + +void test_evict_strategy_customized_advanced(size_t max_hbm_for_vectors, + bool use_constant_memory) { + constexpr uint64_t BUCKET_NUM = 8UL; + constexpr uint64_t BUCKET_MAX_SIZE = 128UL; + constexpr uint64_t INIT_CAPACITY = BUCKET_NUM * BUCKET_MAX_SIZE; // 1024UL; + constexpr uint64_t MAX_CAPACITY = INIT_CAPACITY; + constexpr uint64_t BASE_KEY_NUM = BUCKET_MAX_SIZE; + constexpr uint64_t TEST_KEY_NUM = 8; + constexpr uint64_t TEMP_KEY_NUM = std::max(BASE_KEY_NUM, TEST_KEY_NUM); + constexpr uint64_t TEST_TIMES = 256; + + TableOptions options; + + options.init_capacity = INIT_CAPACITY; + options.max_capacity = MAX_CAPACITY; + options.dim = DIM; + options.max_hbm_for_vectors = nv::merlin::GB(max_hbm_for_vectors); + options.evict_strategy = nv::merlin::EvictStrategy::kCustomized; + options.use_constant_memory = use_constant_memory; + + std::array h_keys_base; + std::array h_metas_base; + std::array h_vectors_base; + + std::array h_keys_test; + std::array h_metas_test; + std::array h_vectors_test; + + std::array h_keys_temp; + std::array h_metas_temp; + std::array h_vectors_temp; + + K* d_keys_temp; + M* d_metas_temp = nullptr; + V* d_vectors_temp; + + CUDA_CHECK(cudaMalloc(&d_keys_temp, TEMP_KEY_NUM * sizeof(K))); + CUDA_CHECK(cudaMalloc(&d_metas_temp, TEMP_KEY_NUM * sizeof(M))); + CUDA_CHECK( + cudaMalloc(&d_vectors_temp, TEMP_KEY_NUM * sizeof(V) * options.dim)); + + create_keys_in_one_buckets( + h_keys_base.data(), h_metas_base.data(), h_vectors_base.data(), + BASE_KEY_NUM, INIT_CAPACITY, BUCKET_MAX_SIZE, 1, 0, 0x3FFFFFFFFFFFFFFF); + + const M base_meta_start = 1000; + for (int i = 0; i < BASE_KEY_NUM; i++) { + h_metas_base[i] = base_meta_start + i; + } + + create_keys_in_one_buckets(h_keys_test.data(), h_metas_test.data(), + h_vectors_test.data(), TEST_KEY_NUM, + INIT_CAPACITY, BUCKET_MAX_SIZE, 1, + 0x3FFFFFFFFFFFFFFF, 0xFFFFFFFFFFFFFFFD); + + h_keys_test[4] = h_keys_base[72]; + h_keys_test[5] = h_keys_base[73]; + h_keys_test[6] = h_keys_base[74]; + h_keys_test[7] = h_keys_base[75]; + + // replace four new keys to lower metas, would not be inserted. + h_metas_test[0] = 20; + h_metas_test[1] = 78; + h_metas_test[2] = 97; + h_metas_test[3] = 98; + + // replace three exist keys to new metas, just refresh the meta for them. + h_metas_test[4] = 99; + h_metas_test[5] = 1010; + h_metas_test[6] = 1020; + h_metas_test[7] = 1035; + + for (int i = 4; i < TEST_KEY_NUM; i++) { + for (int j = 0; j < options.dim; j++) { + h_vectors_test[i * options.dim + j] = + static_cast(h_keys_test[i] * 0.00001); + } + } + + cudaStream_t stream; + CUDA_CHECK(cudaStreamCreate(&stream)); + + size_t total_size = 0; + size_t dump_counter = 0; + for (int i = 0; i < TEST_TIMES; i++) { + std::unique_ptr
table = std::make_unique
(); + table->init(options); + + total_size = table->size(stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + ASSERT_EQ(total_size, 0); + + { + CUDA_CHECK(cudaMemcpy(d_keys_temp, h_keys_base.data(), + BASE_KEY_NUM * sizeof(K), cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy(d_metas_temp, h_metas_base.data(), + BASE_KEY_NUM * sizeof(M), cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy(d_vectors_temp, h_vectors_base.data(), + BASE_KEY_NUM * sizeof(V) * options.dim, + cudaMemcpyHostToDevice)); + table->insert_or_assign(BASE_KEY_NUM, d_keys_temp, d_vectors_temp, + d_metas_temp, stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + + size_t total_size = table->size(stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + ASSERT_EQ(total_size, BUCKET_MAX_SIZE); + + dump_counter = table->export_batch(table->capacity(), 0, d_keys_temp, + d_vectors_temp, d_metas_temp, stream); + ASSERT_EQ(dump_counter, BUCKET_MAX_SIZE); + + CUDA_CHECK(cudaMemcpy(h_keys_temp.data(), d_keys_temp, + BASE_KEY_NUM * sizeof(K), cudaMemcpyDefault)); + CUDA_CHECK(cudaMemcpy(h_metas_temp.data(), d_metas_temp, + BASE_KEY_NUM * sizeof(M), cudaMemcpyDefault)); + CUDA_CHECK(cudaMemcpy(h_vectors_temp.data(), d_vectors_temp, + BASE_KEY_NUM * sizeof(V) * options.dim, + cudaMemcpyDefault)); + + std::array h_metas_temp_sorted(h_metas_temp); + std::sort(h_metas_temp_sorted.begin(), h_metas_temp_sorted.end()); + + ASSERT_TRUE((h_metas_temp_sorted == + test_util::range(base_meta_start))); + for (int i = 0; i < dump_counter; i++) { + for (int j = 0; j < options.dim; j++) { + ASSERT_EQ(h_vectors_temp[i * options.dim + j], + static_cast(h_keys_temp[i] * 0.00001)); + } + } + } + + { + CUDA_CHECK(cudaMemcpy(d_keys_temp, h_keys_test.data(), + TEST_KEY_NUM * sizeof(K), cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy(d_metas_temp, h_metas_test.data(), + TEST_KEY_NUM * sizeof(M), cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy(d_vectors_temp, h_vectors_test.data(), + TEST_KEY_NUM * sizeof(V) * options.dim, + cudaMemcpyHostToDevice)); + table->insert_or_assign(TEST_KEY_NUM, d_keys_temp, d_vectors_temp, + d_metas_temp, stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + + size_t total_size = table->size(stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + ASSERT_EQ(total_size, BUCKET_MAX_SIZE); + + dump_counter = table->export_batch(table->capacity(), 0, d_keys_temp, + d_vectors_temp, d_metas_temp, stream); + ASSERT_EQ(dump_counter, BUCKET_MAX_SIZE); + + CUDA_CHECK(cudaMemcpy(h_keys_temp.data(), d_keys_temp, + TEMP_KEY_NUM * sizeof(K), cudaMemcpyDefault)); + CUDA_CHECK(cudaMemcpy(h_metas_temp.data(), d_metas_temp, + TEMP_KEY_NUM * sizeof(M), cudaMemcpyDefault)); + CUDA_CHECK(cudaMemcpy(h_vectors_temp.data(), d_vectors_temp, + TEMP_KEY_NUM * sizeof(V) * options.dim, + cudaMemcpyDefault)); + + for (int i = 0; i < TEST_KEY_NUM; i++) { + if (i < 4) { + ASSERT_EQ(h_keys_temp.end(), + std::find(h_keys_temp.begin(), h_keys_temp.end(), + h_keys_test[i])); + } else { + ASSERT_NE(h_keys_temp.end(), + std::find(h_keys_temp.begin(), h_keys_temp.end(), + h_keys_test[i])); + } + } + for (int i = 0; i < TEMP_KEY_NUM; i++) { + if (h_keys_temp[i] == h_keys_test[4]) + ASSERT_EQ(h_metas_temp[i], h_metas_test[4]); + if (h_keys_temp[i] == h_keys_test[5]) + ASSERT_EQ(h_metas_temp[i], h_metas_test[5]); + if (h_keys_temp[i] == h_keys_test[6]) + ASSERT_EQ(h_metas_temp[i], h_metas_test[6]); + if (h_keys_temp[i] == h_keys_test[7]) + ASSERT_EQ(h_metas_temp[i], h_metas_test[7]); + + for (int j = 0; j < options.dim; j++) { + ASSERT_EQ(h_vectors_temp[i * options.dim + j], + static_cast(h_keys_temp[i] * 0.00001)); + } + } + } + } + CUDA_CHECK(cudaStreamDestroy(stream)); + + CUDA_CHECK(cudaFree(d_keys_temp)); + CUDA_CHECK(cudaFree(d_metas_temp)); + CUDA_CHECK(cudaFree(d_vectors_temp)); + + CUDA_CHECK(cudaDeviceSynchronize()); + + CudaCheckError(); +} + +void test_evict_strategy_customized_correct_rate(size_t max_hbm_for_vectors, + bool use_constant_memory) { + constexpr uint64_t BATCH_SIZE = 1024 * 1024ul; + constexpr uint64_t STEPS = 128; + constexpr uint64_t MAX_BUCKET_SIZE = 128; + constexpr uint64_t INIT_CAPACITY = BATCH_SIZE * STEPS; + constexpr uint64_t MAX_CAPACITY = INIT_CAPACITY; + constexpr uint64_t TEST_TIMES = 1; + float expected_correct_rate = 0.964; + const int rounds = 3; + + TableOptions options; + + options.init_capacity = INIT_CAPACITY; + options.max_capacity = MAX_CAPACITY; + options.dim = DIM; + options.max_bucket_size = MAX_BUCKET_SIZE; + options.max_hbm_for_vectors = nv::merlin::GB(max_hbm_for_vectors); + options.evict_strategy = nv::merlin::EvictStrategy::kCustomized; + options.use_constant_memory = use_constant_memory; + + K* h_keys_base; + M* h_metas_base; + V* h_vectors_base; + + K* h_keys_temp; + M* h_metas_temp; + V* h_vectors_temp; + + K* d_keys_temp; + M* d_metas_temp = nullptr; + V* d_vectors_temp; + + CUDA_CHECK(cudaMallocHost(&h_keys_base, BATCH_SIZE * sizeof(K))); + CUDA_CHECK(cudaMallocHost(&h_metas_base, BATCH_SIZE * sizeof(M))); + CUDA_CHECK( + cudaMallocHost(&h_vectors_base, BATCH_SIZE * sizeof(V) * options.dim)); + + CUDA_CHECK(cudaMallocHost(&h_keys_temp, MAX_CAPACITY * sizeof(K))); + CUDA_CHECK(cudaMallocHost(&h_metas_temp, MAX_CAPACITY * sizeof(M))); + CUDA_CHECK( + cudaMallocHost(&h_vectors_temp, MAX_CAPACITY * sizeof(V) * options.dim)); + + CUDA_CHECK(cudaMalloc(&d_keys_temp, MAX_CAPACITY * sizeof(K))); + CUDA_CHECK(cudaMalloc(&d_metas_temp, MAX_CAPACITY * sizeof(M))); + CUDA_CHECK( + cudaMalloc(&d_vectors_temp, MAX_CAPACITY * sizeof(V) * options.dim)); + + cudaStream_t stream; + CUDA_CHECK(cudaStreamCreate(&stream)); + + size_t total_size = 0; + size_t global_start_key = 100000; + for (int i = 0; i < TEST_TIMES; i++) { + std::unique_ptr
table = std::make_unique
(); + table->init(options); + size_t start_key = global_start_key; + + total_size = table->size(stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + ASSERT_EQ(total_size, 0); + + for (int r = 0; r < rounds; r++) { + size_t expected_min_key = global_start_key + INIT_CAPACITY * r; + size_t expected_max_key = global_start_key + INIT_CAPACITY * (r + 1) - 1; + size_t expected_table_size = + (r == 0) ? size_t(expected_correct_rate * INIT_CAPACITY) + : INIT_CAPACITY; + + for (int s = 0; s < STEPS; s++) { + create_continuous_keys(h_keys_base, h_metas_base, + h_vectors_base, BATCH_SIZE, start_key); + start_key += BATCH_SIZE; + + CUDA_CHECK(cudaMemcpy(d_keys_temp, h_keys_base, BATCH_SIZE * sizeof(K), + cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy(d_metas_temp, h_metas_base, + BATCH_SIZE * sizeof(M), cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy(d_vectors_temp, h_vectors_base, + BATCH_SIZE * sizeof(V) * options.dim, + cudaMemcpyHostToDevice)); + table->insert_or_assign(BATCH_SIZE, d_keys_temp, d_vectors_temp, + d_metas_temp, stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + } + + size_t total_size = table->size(stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + ASSERT_GE(total_size, expected_table_size); + ASSERT_EQ(MAX_CAPACITY, table->capacity()); + + size_t dump_counter = table->export_batch( + MAX_CAPACITY, 0, d_keys_temp, d_vectors_temp, d_metas_temp, stream); + + CUDA_CHECK(cudaMemcpy(h_keys_temp, d_keys_temp, MAX_CAPACITY * sizeof(K), + cudaMemcpyDefault)); + CUDA_CHECK(cudaMemcpy(h_metas_temp, d_metas_temp, + MAX_CAPACITY * sizeof(M), cudaMemcpyDefault)); + CUDA_CHECK(cudaMemcpy(h_vectors_temp, d_vectors_temp, + MAX_CAPACITY * sizeof(V) * options.dim, + cudaMemcpyDefault)); + + size_t bigger_meta_counter = 0; + K max_key = 0; + + for (int i = 0; i < dump_counter; i++) { + ASSERT_EQ(h_keys_temp[i], h_metas_temp[i]); + max_key = std::max(max_key, h_keys_temp[i]); + if (h_metas_temp[i] >= expected_min_key) bigger_meta_counter++; + for (int j = 0; j < options.dim; j++) { + ASSERT_EQ(h_vectors_temp[i * options.dim + j], + static_cast(h_keys_temp[i] * 0.00001)); + } + } + + float correct_rate = (bigger_meta_counter * 1.0) / MAX_CAPACITY; + std::cout << std::setprecision(3) << "[Round " << r << "]" + << "correct_rate=" << correct_rate << std::endl; + ASSERT_GE(max_key, expected_max_key); + ASSERT_GE(correct_rate, expected_correct_rate); + } + } + CUDA_CHECK(cudaStreamDestroy(stream)); + + CUDA_CHECK(cudaFreeHost(h_keys_base)); + CUDA_CHECK(cudaFreeHost(h_metas_base)); + CUDA_CHECK(cudaFreeHost(h_vectors_base)); + + CUDA_CHECK(cudaFreeHost(h_keys_temp)); + CUDA_CHECK(cudaFreeHost(h_metas_temp)); + CUDA_CHECK(cudaFreeHost(h_vectors_temp)); + + CUDA_CHECK(cudaFree(d_keys_temp)); + CUDA_CHECK(cudaFree(d_metas_temp)); + CUDA_CHECK(cudaFree(d_vectors_temp)); + + CUDA_CHECK(cudaDeviceSynchronize()); + + CudaCheckError(); +} TEST(MerlinHashTableTest, test_basic) { test_basic(16, true); test_basic(0, true); @@ -1298,3 +1990,31 @@ TEST(MerlinHashTableTest, test_basic_for_cpu_io) { test_basic_for_cpu_io(true); test_basic_for_cpu_io(false); } + +TEST(MerlinHashTableTest, test_evict_strategy_lru_basic) { + test_evict_strategy_lru_basic(16, true); + test_evict_strategy_lru_basic(0, true); + test_evict_strategy_lru_basic(16, false); + test_evict_strategy_lru_basic(0, false); +} + +TEST(MerlinHashTableTest, test_evict_strategy_customized_basic) { + test_evict_strategy_customized_basic(16, true); + test_evict_strategy_customized_basic(0, true); + test_evict_strategy_customized_basic(16, false); + test_evict_strategy_customized_basic(0, false); +} + +TEST(MerlinHashTableTest, test_evict_strategy_customized_advanced) { + test_evict_strategy_customized_advanced(16, true); + test_evict_strategy_customized_advanced(0, true); + test_evict_strategy_customized_advanced(16, false); + test_evict_strategy_customized_advanced(0, false); +} + +TEST(MerlinHashTableTest, test_evict_strategy_customized_correct_rate) { + test_evict_strategy_customized_correct_rate(16, true); + test_evict_strategy_customized_advanced(0, true); + test_evict_strategy_customized_advanced(16, false); + test_evict_strategy_customized_advanced(0, false); +} \ No newline at end of file diff --git a/tests/test_util.cuh b/tests/test_util.cuh index 95915dabf..1fd181af7 100644 --- a/tests/test_util.cuh +++ b/tests/test_util.cuh @@ -206,4 +206,14 @@ bool tables_equal(TableType* a, TableType* b, cudaStream_t stream) { return true; } +template +std::array range(const T start) { + std::array result; + size_t i = 0; + while (i < N) { + result[i] = start + i; + i++; + } + return result; +} } // namespace test_util