diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..4371487 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2020 Parallel and Distributed Architectures + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/benchmark/Makefile b/benchmark/Makefile index f94a163..bb57972 100644 --- a/benchmark/Makefile +++ b/benchmark/Makefile @@ -1,7 +1,3 @@ -SRCDIR := src -BINDIR := bin -INCDIR := ../include - INCDIRS := ../include CC := g++ @@ -9,26 +5,34 @@ STD := c++14 NVCC := nvcc CCFLAGS := -O3 -Wall -Wextra -fopenmp -DNDEBUG NVCCGENCODE = -gencode arch=compute_70,code=sm_70 -NVCCFLAGS := -std=$(STD) $(NVCCGENCODE) --expt-extended-lambda -DNDEBUG -ccbin $(CC) $(addprefix -Xcompiler ,$(CCFLAGS)) +NVCCFLAGS := -std=$(STD) $(NVCCGENCODE) --expt-extended-lambda --expt-relaxed-constexpr -DNDEBUG -ccbin $(CC) $(addprefix -Xcompiler ,$(CCFLAGS)) INCS := $(foreach dir, $(INCDIRS), $(wildcard $(dir)/*.cuh $(dir)/*.h $(dir)/*.hpp)) INCPARAMS := $(addprefix -I, $(INCDIRS)) -all: single_value multi_value counting bloom_filter bucket_list +all: bin/single_value_benchmark.out \ + bin/multi_value_benchmark.out \ + bin/multi_bucket_benchmark.out \ + bin/counting_benchmark.out \ + bin/bloom_filter_benchmark.out \ + bin/bucket_list_benchmark.out -single_value: ${INCS} | bin +bin/single_value_benchmark.out: src/single_value_benchmark.cu src/common.cuh ${INCS} | bin $(NVCC) $(NVCCFLAGS) $(INCPARAMS) src/single_value_benchmark.cu -o bin/single_value_benchmark.out -multi_value: ${INCS} | bin +bin/multi_value_benchmark.out: src/multi_value_benchmark.cu src/common.cuh ${INCS} | bin $(NVCC) $(NVCCFLAGS) $(INCPARAMS) src/multi_value_benchmark.cu -o bin/multi_value_benchmark.out -counting: ${INCS} | bin +bin/multi_bucket_benchmark.out: src/multi_bucket_benchmark.cu src/common.cuh ${INCS} | bin + $(NVCC) $(NVCCFLAGS) $(INCPARAMS) src/multi_bucket_benchmark.cu -o bin/multi_bucket_benchmark.out + +bin/counting_benchmark.out: src/counting_benchmark.cu src/common.cuh ${INCS} | bin $(NVCC) $(NVCCFLAGS) $(INCPARAMS) src/counting_benchmark.cu -o bin/counting_benchmark.out -bloom_filter: ${INCS} | bin +bin/bloom_filter_benchmark.out: src/bloom_filter_benchmark.cu src/common.cuh ${INCS} | bin $(NVCC) $(NVCCFLAGS) $(INCPARAMS) src/bloom_filter_benchmark.cu -o bin/bloom_filter_benchmark.out -bucket_list: ${INCS} | bin +bin/bucket_list_benchmark.out: src/bucket_list_benchmark.cu src/common.cuh ${INCS} | bin $(NVCC) $(NVCCFLAGS) $(INCPARAMS) src/bucket_list_benchmark.cu -o bin/bucket_list_benchmark.out debug: OPT := 0 @@ -43,7 +47,7 @@ profile: all clean: $(RM) -r bin -$(BINDIR): +bin: mkdir -p $@ -.PHONY: clean all bin +.PHONY: clean all debug profile diff --git a/benchmark/src/bucket_list_benchmark.cu b/benchmark/src/bucket_list_benchmark.cu index 8d930e8..b439e99 100644 --- a/benchmark/src/bucket_list_benchmark.cu +++ b/benchmark/src/bucket_list_benchmark.cu @@ -1,18 +1,15 @@ -#include +#include "common.cuh" +#include "warpcore.cuh" +#include "../../ext/hpc_helpers/include/io_helpers.h" +#include #include -#include -#include -#include +#include #include #include -#include -#include -#include "warpcore.cuh" -#include "../../ext/hpc_helpers/include/io_helpers.h" +#include template bool sufficient_memory( - size_t input_size, size_t key_store_capacity, size_t value_store_capacity, float headroom_factor = 1.1) @@ -21,8 +18,7 @@ bool sufficient_memory( const size_t table_bytes = key_handle_bytes*key_store_capacity; const size_t value_bytes = std::min(sizeof(Value), sizeof(uint64_t)); const size_t value_store_bytes = value_bytes * value_store_capacity; - const size_t io_bytes = (sizeof(Key)+sizeof(Value)+sizeof(uint64_t))*input_size; - const size_t total_bytes = (table_bytes+value_store_bytes+io_bytes)*headroom_factor; + const size_t total_bytes = (table_bytes+value_store_bytes)*headroom_factor; size_t bytes_free, bytes_total; cudaMemGetInfo(&bytes_free, &bytes_total); CUERR @@ -30,77 +26,53 @@ bool sufficient_memory( return (total_bytes <= bytes_free); } -uint64_t memory_partition(float factor = 0.4) -{ - size_t bytes_free, bytes_total; - cudaMemGetInfo(&bytes_free, &bytes_total); CUERR - - return bytes_free * factor; -} - -template -uint64_t num_unique(const std::vector& v) noexcept -{ - T * keys_d = nullptr; - cudaMalloc(&keys_d, sizeof(T) * v.size()); CUERR - cudaMemcpy(keys_d, v.data(), sizeof(T) * v.size(), H2D); CUERR - - auto set = warpcore::HashSet(v.size()); - - set.insert(keys_d, v.size()); - - cudaFree(keys_d); - - return set.size(); -} - template HOSTQUALIFIER INLINEQUALIFIER void bucket_list_benchmark( - const std::vector& keys, - uint64_t key_store_capacity, - uint64_t value_store_capacity, - std::vector input_sizes = {(1UL<<27)}, - std::vector> slab_list_configs = {{1.1, 1, 0}}, + const typename HashTable::key_type * keys_d, + const uint64_t max_keys, + float key_load_factor, + float value_load_factor, + std::vector input_sizes, + std::vector> slab_list_configs, typename HashTable::key_type seed = warpcore::defaults::seed(), - uint64_t dev_id = 0, bool print_headers = true, uint8_t iters = 1, std::chrono::milliseconds thermal_backoff = std::chrono::milliseconds(100)) { using index_t = typename HashTable::index_type; - cudaSetDevice(dev_id); CUERR - using key_t = typename HashTable::key_type; using value_t = typename HashTable::value_type; + const uint64_t max_unique_size = num_unique(keys_d, max_keys); + + std::cout << "unique_keys: " << max_unique_size << "\tvalues: " << max_keys << std::endl; + + const uint64_t key_store_capacity = max_unique_size / key_load_factor; + const uint64_t value_store_capacity = max_keys / value_load_factor; + + key_t* query_keys_d = nullptr; + cudaMalloc(&query_keys_d, sizeof(key_t)*max_keys); CUERR + value_t* values_d = nullptr; + cudaMalloc(&values_d, sizeof(value_t)*max_keys); CUERR + index_t * offsets_d = nullptr; + cudaMalloc(&offsets_d, sizeof(index_t)*(max_keys+1)); CUERR + const auto max_input_size = *std::max_element(input_sizes.begin(), input_sizes.end()); - if(max_input_size > keys.size()) + if(max_input_size > max_keys) { std::cerr << "Maximum input size exceeded." << std::endl; exit(1); } - if(!sufficient_memory( - max_input_size, key_store_capacity, value_store_capacity)) + if(!sufficient_memory(key_store_capacity, value_store_capacity)) { std::cerr << "Not enough GPU memory." << std::endl; exit(1); } - key_t* keys_d = nullptr; - cudaMalloc(&keys_d, sizeof(key_t)*max_input_size); CUERR - key_t* unique_keys_d = nullptr; - cudaMalloc(&unique_keys_d, sizeof(key_t)*max_input_size); CUERR - value_t* values_d = nullptr; - cudaMalloc(&values_d, sizeof(value_t)*max_input_size); CUERR - index_t * offsets_d = nullptr; - cudaMalloc(&offsets_d, sizeof(index_t)*(max_input_size+1)); CUERR - - cudaMemcpy(keys_d, keys.data(), sizeof(key_t)*max_input_size, H2D); CUERR - for(const auto& size : input_sizes) { for(const auto& slab_list_config : slab_list_configs) @@ -116,142 +88,46 @@ void bucket_list_benchmark( slab_grow_factor, min_slab_size); - std::vector insert_times(iters); - for(uint64_t i = 0; i < iters; i++) - { - hash_table.init(); - cudaEvent_t insert_start, insert_stop; - float t; - cudaEventCreate(&insert_start); - cudaEventCreate(&insert_stop); - cudaEventRecord(insert_start, 0); - hash_table.insert(keys_d, values_d, size); - cudaEventRecord(insert_stop, 0); - cudaEventSynchronize(insert_stop); - cudaEventElapsedTime(&t, insert_start, insert_stop); - cudaDeviceSynchronize(); CUERR - insert_times[i] = t; - std::this_thread::sleep_for(thermal_backoff); - } - const float insert_time = - *std::min_element(insert_times.begin(), insert_times.end()); - - index_t key_size_out = 0; - index_t value_size_out = 0; + Output output; + output.sample_size = size; + output.key_capacity = hash_table.key_capacity(); + output.value_capacity = hash_table.value_capacity(); - hash_table.retrieve_all_keys(unique_keys_d, key_size_out); + output.insert_ms = benchmark_insert( + hash_table, keys_d, values_d, size, + iters, thermal_backoff); - /* - key_size_out = size; - helpers::lambda_kernel<<>>( - [=] DEVICEQUALIFIER - { - const auto tid = blockDim.x * blockIdx.x + threadIdx.x; + output.query_ms = benchmark_query_multi( + hash_table, query_keys_d, size, + offsets_d, values_d, + iters, thermal_backoff); - if(tid >= size) return; - unique_keys_d[tid] = tid + 1; - }); - cudaDeviceSynchronize(); CUERR - */ + // output.query_ms = benchmark_query_unique( + // hash_table, query_keys_d, offsets_d, values_d, + // iters, thermal_backoff); - std::vector query_times(iters); - for(uint64_t i = 0; i < iters; i++) - { - cudaEvent_t query_start, query_stop; - float t; - cudaEventCreate(&query_start); - cudaEventCreate(&query_stop); - cudaEventRecord(query_start, 0); - hash_table.retrieve( - unique_keys_d, - key_size_out, - offsets_d, - offsets_d+1, - values_d, - value_size_out); - cudaEventRecord(query_stop, 0); - cudaEventSynchronize(query_stop); - cudaEventElapsedTime(&t, query_start, query_stop); - cudaDeviceSynchronize(); CUERR - query_times[i] = t; - std::this_thread::sleep_for(thermal_backoff); - } - const float query_time = - *std::min_element(query_times.begin(), query_times.end()); + output.key_load_factor = hash_table.key_load_factor(); + output.value_load_factor = hash_table.value_load_factor(); + output.density = hash_table.storage_density(); + output.relative_density = hash_table.relative_storage_density(); + output.status = hash_table.pop_status(); - const uint64_t total_bytes = (sizeof(key_t) + sizeof(value_t))*size; - uint64_t ips = size/(insert_time/1000); - uint64_t qps = size/(query_time/1000); - float itp = helpers::B2GB(total_bytes) / (insert_time/1000); - float qtp = helpers::B2GB(total_bytes) / (query_time/1000); - float key_load = hash_table.key_load_factor(); - float value_load = hash_table.value_load_factor(); - float density = hash_table.storage_density(); - float relative_density = hash_table.relative_storage_density(); - warpcore::Status status = hash_table.pop_status(); + std::cout << std::fixed + << "grow_factor=" << slab_grow_factor + << output.d << "min_slab_size=" << min_slab_size + << output.d << "max_slab_size=" << max_slab_size + << output.d; if(print_headers) - { - const char d = ' '; - - std::cout << "N=" << size << std::fixed - << d << "key_capacity=" << key_store_capacity - << d << "value_capacity=" << value_store_capacity - << d << "bits_key=" << sizeof(key_t)*CHAR_BIT - << d << "bits_value=" << sizeof(value_t)*CHAR_BIT - << d << "mb_keys=" << uint64_t(helpers::B2MB(sizeof(key_t)*size)) - << d << "mb_values=" << uint64_t(helpers::B2MB(sizeof(value_t)*size)) - << d << "grow_factor=" << slab_grow_factor - << d << "min_slab_size=" << min_slab_size - << d << "max_slab_size=" << max_slab_size - << d << "key_load=" << key_load - << d << "value_load=" << value_load - << d << "density=" << density - << d << "relative_density=" << relative_density - << d << "insert_ms=" << insert_time - << d << "query_ms=" << query_time - << d << "IPS=" << ips - << d << "QPS=" << qps - << d << "insert_GB/s=" << itp - << d << "query_GB/s=" << qtp - << d << "status=" << status << std::endl; - } + output.print_with_headers(); else - { - const char d = ' '; - - std::cout << std::fixed - << size - << d << key_store_capacity - << d << value_store_capacity - << d << sizeof(key_t)*CHAR_BIT - << d << sizeof(value_t)*CHAR_BIT - << d << uint64_t(helpers::B2MB(sizeof(key_t)*size)) - << d << uint64_t(helpers::B2MB(sizeof(value_t)*size)) - << d << slab_grow_factor - << d << min_slab_size - << d << max_slab_size - << d << key_load - << d << value_load - << d << density - << d << relative_density - << d << insert_time - << d << query_time - << d << ips - << d << qps - << d << itp - << d << qtp - << d << status << std::endl; - } + output.print_without_headers(); } } - cudaFree(keys_d); CUERR - cudaFree(unique_keys_d); CUERR + cudaFree(query_keys_d); CUERR cudaFree(values_d); CUERR cudaFree(offsets_d); CUERR - - cudaDeviceSynchronize(); CUERR } int main(int argc, char* argv[]) @@ -261,59 +137,36 @@ int main(int argc, char* argv[]) using key_t = std::uint32_t; using value_t = std::uint32_t; - using hash_table_t = BucketListHashTable< - key_t, - value_t, - defaults::empty_key(), - defaults::tombstone_key(), - storage::multi_value::BucketListStore, - defaults::probing_scheme_t>; + const uint64_t max_keys = 1UL << 27; + const bool print_headers = true; - const uint64_t max_keys = 1UL << 27; uint64_t dev_id = 0; - std::vector keys; - if(argc > 2) dev_id = std::atoi(argv[2]); + cudaSetDevice(dev_id); CUERR + key_t * keys_d = nullptr; if(argc > 1) - { - keys = helpers::load_binary(argv[1], max_keys); - } + keys_d = load_keys(argv[1], max_keys); else - { - keys.resize(max_keys); + keys_d = generate_keys(max_keys, 8); - key_t * keys_d = nullptr; - cudaMalloc(&keys_d, sizeof(key_t) * max_keys); CUERR - - helpers::lambda_kernel - <<>> - ([=] DEVICEQUALIFIER - { - const uint64_t tid = blockDim.x * blockIdx.x + threadIdx.x; - - if(tid < max_keys) - { - // 8 values per key - keys_d[tid] = (tid % (max_keys / 8)) + 1; - } - }); - - cudaMemcpy(keys.data(), keys_d, sizeof(key_t) * max_keys, D2H); CUERR - - cudaFree(keys_d); CUERR - } + using hash_table_t = BucketListHashTable< + key_t, + value_t, + defaults::empty_key(), + defaults::tombstone_key(), + storage::multi_value::BucketListStore, + defaults::probing_scheme_t>; - std::cout << "unique_keys: " << num_unique(keys) << "\tvalues: " << keys.size() << std::endl; bucket_list_benchmark( - keys, - num_unique(keys) / 0.90, - keys.size() / 0.50, + keys_d, max_keys, + 0.90, + 0.50, {max_keys}, {{1.1, 1, 0}}, 0x5ad0ded, - dev_id); + print_headers); - cudaDeviceSynchronize(); CUERR + cudaFree(keys_d); CUERR } diff --git a/benchmark/src/common.cuh b/benchmark/src/common.cuh new file mode 100644 index 0000000..888a2cc --- /dev/null +++ b/benchmark/src/common.cuh @@ -0,0 +1,379 @@ + + +#ifndef BENCHMARK_COMMON_CUH +#define BENCHMARK_COMMON_CUH + +#include "warpcore.cuh" +#include "../../ext/hpc_helpers/include/cuda_helpers.cuh" +#include "../../ext/hpc_helpers/include/io_helpers.h" +#include +#include +#include +#include +#include + +uint64_t memory_partition(float factor = 0.4) +{ + size_t bytes_free, bytes_total; + cudaMemGetInfo(&bytes_free, &bytes_total); CUERR + + return bytes_free * factor; +} + +template +uint64_t num_unique(const T* keys_d, const uint64_t size) noexcept +{ + auto set = warpcore::HashSet(size); + + set.insert(keys_d, size); + + return set.size(); +} + +template +key_t * generate_keys( + const uint64_t max_keys = 1UL << 27, + const uint32_t multiplicity = 1) +{ + key_t * keys_d = nullptr; + cudaMalloc(&keys_d, sizeof(key_t) * max_keys); CUERR + + helpers::lambda_kernel + <<>> + ([=] DEVICEQUALIFIER + { + const uint64_t tid = blockDim.x * blockIdx.x + threadIdx.x; + + if(tid < max_keys) + { + keys_d[tid] = (tid % (max_keys / multiplicity)) + 1; + } + }); + + cudaDeviceSynchronize(); CUERR + + return keys_d; +} + +template +key_t * load_keys( + const char* file_name, + const uint64_t max_keys = 1UL << 27) +{ + std::vector keys = helpers::load_binary(file_name, max_keys); + + key_t * keys_d = nullptr; + cudaMalloc(&keys_d, sizeof(key_t) * max_keys); CUERR + + cudaMemcpy(keys_d, keys.data(), sizeof(key_t) * max_keys, H2D); CUERR + + return keys_d; +} + +template +bool sufficient_memory_oa(size_t target_capacity, float headroom_factor = 1.1) +{ + using key_t = typename HashTable::key_type; + using value_t = typename HashTable::value_type; + + const size_t capacity = + warpcore::detail::get_valid_capacity(target_capacity, HashTable::cg_size()); + const size_t key_val_bytes = sizeof(key_t) + sizeof(value_t); + const size_t table_bytes = key_val_bytes*capacity; + const size_t total_bytes = table_bytes*headroom_factor; + + size_t bytes_free, bytes_total; + cudaMemGetInfo(&bytes_free, &bytes_total); CUERR + + return (total_bytes <= bytes_free); +} + +template +float benchmark_insert( + HashTable& hash_table, + const typename HashTable::key_type * keys_d, + const uint64_t size, + const uint8_t iters, + const std::chrono::milliseconds thermal_backoff) +{ + std::vector insert_times(iters); + for(uint64_t i = 0; i < iters; i++) + { + hash_table.init(); + cudaEvent_t insert_start, insert_stop; + float t; + cudaEventCreate(&insert_start); + cudaEventCreate(&insert_stop); + cudaEventRecord(insert_start, 0); + hash_table.insert(keys_d, size); + cudaEventRecord(insert_stop, 0); + cudaEventSynchronize(insert_stop); + cudaEventElapsedTime(&t, insert_start, insert_stop); + cudaDeviceSynchronize(); CUERR + insert_times[i] = t; + std::this_thread::sleep_for (thermal_backoff); + } + return *std::min_element(insert_times.begin(), insert_times.end()); +} + +template +float benchmark_insert( + HashTable& hash_table, + const typename HashTable::key_type * keys_d, + const typename HashTable::value_type * values_d, + const uint64_t size, + const uint8_t iters, + const std::chrono::milliseconds thermal_backoff) +{ + std::vector insert_times(iters); + for(uint64_t i = 0; i < iters; i++) + { + hash_table.init(); + cudaEvent_t insert_start, insert_stop; + float t; + cudaEventCreate(&insert_start); + cudaEventCreate(&insert_stop); + cudaEventRecord(insert_start, 0); + hash_table.insert(keys_d, values_d, size); + cudaEventRecord(insert_stop, 0); + cudaEventSynchronize(insert_stop); + cudaEventElapsedTime(&t, insert_start, insert_stop); + cudaDeviceSynchronize(); CUERR + insert_times[i] = t; + std::this_thread::sleep_for (thermal_backoff); + } + return *std::min_element(insert_times.begin(), insert_times.end()); +} + +template +float benchmark_query( + HashTable& hash_table, + const typename HashTable::key_type * keys_d, + typename HashTable::value_type * values_d, + const uint64_t size, + const uint8_t iters, + const std::chrono::milliseconds thermal_backoff) +{ + std::vector query_times(iters); + for(uint64_t i = 0; i < iters; i++) + { + cudaEvent_t query_start, query_stop; + float t; + cudaEventCreate(&query_start); + cudaEventCreate(&query_stop); + cudaEventRecord(query_start, 0); + hash_table.retrieve(keys_d, size, values_d); + cudaEventRecord(query_stop, 0); + cudaEventSynchronize(query_stop); + cudaEventElapsedTime(&t, query_start, query_stop); + cudaDeviceSynchronize(); CUERR + query_times[i] = t; + std::this_thread::sleep_for(thermal_backoff); + } + return *std::min_element(query_times.begin(), query_times.end()); +} + +template +float benchmark_query_multi( + HashTable& hash_table, + typename HashTable::key_type * keys_d, + const uint64_t size, + typename HashTable::index_type * offsets_d, + typename HashTable::value_type * values_d, + const uint8_t iters, + const std::chrono::milliseconds thermal_backoff) +{ + using index_t = typename HashTable::index_type; + + helpers::lambda_kernel + <<>> + ([=] DEVICEQUALIFIER + { + const uint64_t tid = blockDim.x * blockIdx.x + threadIdx.x; + + if(tid < size) + { + keys_d[tid] = tid + 1; + } + }); + cudaDeviceSynchronize(); CUERR + + index_t value_size_out = 0; + + std::vector query_times(iters); + for(uint64_t i = 0; i < iters; i++) + { + cudaEvent_t query_start, query_stop; + float t; + cudaEventCreate(&query_start); + cudaEventCreate(&query_stop); + cudaEventRecord(query_start, 0); + hash_table.retrieve( + keys_d, + size, + offsets_d, + offsets_d+1, + values_d, + value_size_out); + cudaEventRecord(query_stop, 0); + cudaEventSynchronize(query_stop); + cudaEventElapsedTime(&t, query_start, query_stop); + cudaDeviceSynchronize(); CUERR + query_times[i] = t; + std::this_thread::sleep_for(thermal_backoff); + } + return *std::min_element(query_times.begin(), query_times.end()); +} + +template +float benchmark_query_unique( + HashTable& hash_table, + typename HashTable::key_type * unique_keys_d, + typename HashTable::index_type * offsets_d, + typename HashTable::value_type * values_d, + const uint8_t iters, + const std::chrono::milliseconds thermal_backoff) +{ + using index_t = typename HashTable::index_type; + + index_t key_size_out = 0; + index_t value_size_out = 0; + + hash_table.retrieve_all_keys(unique_keys_d, key_size_out); CUERR + + std::vector query_times(iters); + for(uint64_t i = 0; i < iters; i++) + { + cudaEvent_t query_start, query_stop; + float t; + cudaEventCreate(&query_start); + cudaEventCreate(&query_stop); + cudaEventRecord(query_start, 0); + hash_table.retrieve( + unique_keys_d, + key_size_out, + offsets_d, + offsets_d+1, + values_d, + value_size_out); + cudaEventRecord(query_stop, 0); + cudaEventSynchronize(query_stop); + cudaEventElapsedTime(&t, query_start, query_stop); + cudaDeviceSynchronize(); CUERR + query_times[i] = t; + std::this_thread::sleep_for(thermal_backoff); + } + return *std::min_element(query_times.begin(), query_times.end()); +} + +class NoValue {}; + +template +struct Output { +// private: + // enum mode { Set, KeyValue, MultiValue }; + // mode mode_ = Set; + +public: + char d = ' '; + uint64_t sample_size = 0; // num_keys? + uint64_t key_capacity = 0; + uint64_t value_capacity = 0; + float key_load_factor = 0; + float value_load_factor = 0; + float density = 0; + float relative_density = 0; + float insert_ms = 0; + float query_ms = 0; + + // void mode_set() { mode_ = Set; } + // void mode_key_value() { mode_ = KeyValue; } + // void mode_multi_value() { mode_ = MultiValue; } + + uint32_t bits_key() const + { + return sizeof(key_t)*CHAR_BIT; + } + uint32_t bits_value() const + { + if(std::is_same::value) + return 0; + else + return sizeof(value_t)*CHAR_BIT; + } + float mb_keys() const { + return helpers::B2MB(sizeof(key_t)*sample_size); + } + float mb_values() const + { + if(std::is_same::value) + return 0; + else + return helpers::B2MB(sizeof(value_t)*sample_size); + } + float mb_total() const { + if(std::is_same::value) + return mb_keys(); + else + return mb_keys() + mb_values(); + } + + float insert_s() const { return insert_ms/1000; } + float query_s() const { return query_ms/1000; } + uint64_t inserts_per_second() const { return sample_size/insert_s(); } + uint64_t queries_per_second() const { return sample_size/query_s(); } + float insert_troughput_gbps() const { return mb_total()/1024 / insert_s(); } + float query_troughput_gbps() const { return mb_total()/1024 / query_s(); } + + warpcore::Status status = warpcore::Status::none(); + + void print_with_headers() const noexcept + { + std::cout << std::fixed + << "sample_size=" << sample_size + << d << "key_capacity=" << key_capacity + << d << "value_capacity=" << value_capacity + << d << "bits_key=" << bits_key() + << d << "bits_value=" << bits_value() + << d << "mb_keys=" << mb_keys() + << d << "mb_values=" << mb_values() + << d << "key_load=" << key_load_factor + << d << "value_load=" << value_load_factor + << d << "density=" << density + << d << "relative_density=" << relative_density + << d << "insert_ms=" << insert_ms + << d << "query_ms=" << query_ms + << d << "IPS=" << inserts_per_second() + << d << "QPS=" << queries_per_second() + << d << "insert_GB/s=" << insert_troughput_gbps() + << d << "query_GB/s=" << query_troughput_gbps() + << d << "status=" << status + << std::endl; + } + + void print_without_headers() const noexcept + { + std::cout << std::fixed + << sample_size + << d << key_capacity + << d << value_capacity + << d << bits_key() + << d << bits_value() + << d << mb_keys() + << d << mb_values() + << d << key_load_factor + << d << value_load_factor + << d << density + << d << relative_density + << d << insert_ms + << d << query_ms + << d << inserts_per_second() + << d << queries_per_second() + << d << insert_troughput_gbps() + << d << query_troughput_gbps() + << d << status + << std::endl; + } +}; + +#endif \ No newline at end of file diff --git a/benchmark/src/counting_benchmark.cu b/benchmark/src/counting_benchmark.cu index 7ac0ac9..392b34f 100644 --- a/benchmark/src/counting_benchmark.cu +++ b/benchmark/src/counting_benchmark.cu @@ -1,125 +1,79 @@ -#include -#include -#include -#include +#include "common.cuh" #include "warpcore.cuh" #include "../../ext/hpc_helpers/include/io_helpers.h" - -template -uint64_t num_unique(const std::vector& v) noexcept -{ - T * keys_d = nullptr; - cudaMalloc(&keys_d, sizeof(T) * v.size()); CUERR - cudaMemcpy(keys_d, v.data(), sizeof(T) * v.size(), H2D); CUERR - - auto set = warpcore::HashSet(v.size()); - - set.insert(keys_d, v.size()); - - cudaFree(keys_d); - - return set.size(); -} +#include +#include +#include template< - class Key, - class Count, - class Table> + class HashTable> HOSTQUALIFIER INLINEQUALIFIER void counting_benchmark( - std::vector keys_h, - float load, - uint8_t dev_id = 0, + const typename HashTable::key_type * keys_d, + const uint64_t max_keys, + std::vector input_sizes, + std::vector load_factors, bool print_headers = true, - uint8_t iters = 5) + uint8_t iters = 5, + std::chrono::milliseconds thermal_backoff = std::chrono::milliseconds(100)) { - cudaSetDevice(dev_id); CUERR + using key_t = typename HashTable::key_type; + using count_t = typename HashTable::value_type; - uint64_t size = num_unique(keys_h); - uint64_t capacity = size/load; + count_t* counts_d = nullptr; + cudaMalloc(&counts_d, sizeof(count_t)*max_keys); CUERR - Key* keys_d = nullptr; cudaMalloc(&keys_d, sizeof(Key)*keys_h.size()); CUERR - Count* counts_d = nullptr; cudaMalloc(&counts_d, sizeof(Count)*keys_h.size()); CUERR + const auto max_input_size = + *std::max_element(input_sizes.begin(), input_sizes.end()); + const auto min_load_factor = + *std::min_element(load_factors.begin(), load_factors.end()); - cudaMemcpy(keys_d, keys_h.data(), sizeof(Key)*keys_h.size(), H2D); CUERR + if(max_input_size > max_keys) + { + std::cerr << "Maximum input size exceeded." << std::endl; + exit(1); + } - Table hash_table(capacity); + const uint64_t max_unique_size = num_unique(keys_d, max_input_size); - float insert_time = 0.0; - for(uint8_t i = 0; i < iters; i++) + if(!sufficient_memory_oa(max_unique_size / min_load_factor)) { - hash_table.init(); - cudaEvent_t insert_start, insert_stop; - float t; - cudaEventCreate(&insert_start); - cudaEventCreate(&insert_stop); - cudaEventRecord(insert_start, 0); - hash_table.insert(keys_d, keys_h.size()); - cudaEventRecord(insert_stop, 0); - cudaEventSynchronize(insert_stop); - cudaEventElapsedTime(&t, insert_start, insert_stop); - cudaDeviceSynchronize(); CUERR - insert_time += t; + std::cerr << "Not enough GPU memory." << std::endl; + exit(1); } - insert_time /= iters; - float query_time = 0.0; - for(uint8_t i = 0; i < iters; i++) + for(auto size : input_sizes) { - cudaEvent_t query_start, query_stop; - float t; - cudaEventCreate(&query_start); - cudaEventCreate(&query_stop); - cudaEventRecord(query_start, 0); - hash_table.retrieve(keys_d, keys_h.size(), counts_d); - cudaEventRecord(query_stop, 0); - cudaEventSynchronize(query_stop); - cudaEventElapsedTime(&t, query_start, query_stop); - cudaDeviceSynchronize(); CUERR - query_time += t; - } - query_time /= iters; + for(auto load : load_factors) + { + const uint64_t unique_size = num_unique(keys_d, size); + const uint64_t capacity = unique_size/load; - uint64_t ips = keys_h.size()/insert_time*1000; - uint64_t qps = keys_h.size()/query_time*1000; - float itp = helpers::B2GB(sizeof(key_t)*keys_h.size()) / (insert_time/1000); - float qtp = helpers::B2GB(sizeof(key_t)*keys_h.size()) / (query_time/1000); - float actual_load = float(hash_table.size())/float(capacity); + HashTable hash_table(capacity); - if(print_headers) - { - const char d = ' '; - - std::cout << std::fixed - << "N=" << keys_h.size() - << d << "C=" << capacity - << d << "bits_key=" << sizeof(Key)*8 - << d << "bits_count=" << sizeof(Count)*8 - << d << "load=" << actual_load - << d << "IPS=" << ips - << d << "QPS=" << qps - << d << "insert_GB/s=" << itp - << d << "query_GB/s=" << qtp - << d << "status=" << hash_table.pop_status() << std::endl; - } - else - { - const char d = ' '; - - std::cout << std::fixed - << keys_h.size() - << d << capacity - << d << sizeof(Key)*8 - << d << sizeof(Count)*8 - << d << actual_load - << d << ips - << d << qps - << d << itp - << d << qtp - << d << hash_table.pop_status() << std::endl; + Output output; + output.sample_size = size; + output.key_capacity = hash_table.capacity(); + + output.insert_ms = benchmark_insert( + hash_table, keys_d, size, + iters, thermal_backoff); + + output.query_ms = benchmark_query( + hash_table, keys_d, counts_d, size, + iters, thermal_backoff); + + output.key_load_factor = hash_table.load_factor(); + output.density = output.key_load_factor; + output.status = hash_table.pop_status(); + + if(print_headers) + output.print_with_headers(); + else + output.print_without_headers(); + } } - cudaFree(keys_d); CUERR cudaFree(counts_d); CUERR } @@ -127,42 +81,25 @@ int main(int argc, char* argv[]) { using key_t = uint32_t; using count_t = uint32_t; - using hash_table_t = warpcore::CountingHashTable; const uint64_t max_keys = 1UL << 28; - uint64_t dev_id = 0; - std::vector keys; + const bool print_headers = true; + + uint64_t dev_id = 0; if(argc > 2) dev_id = std::atoi(argv[2]); + cudaSetDevice(dev_id); CUERR + key_t * keys_d = nullptr; if(argc > 1) - { - keys = helpers::load_binary(argv[1], max_keys); - } + keys_d = load_keys(argv[1], max_keys); else - { - keys.resize(max_keys); - - key_t * keys_d = nullptr; - cudaMalloc(&keys_d, sizeof(key_t) * max_keys); CUERR - - helpers::lambda_kernel - <<>> - ([=] DEVICEQUALIFIER - { - const uint64_t tid = blockDim.x * blockIdx.x + threadIdx.x; + keys_d = generate_keys(max_keys, 8); - if(tid < max_keys) - { - // 8 values per key - keys_d[tid] = (tid % (max_keys / 8)) + 1; - } - }); + using hash_table_t = warpcore::CountingHashTable; - cudaMemcpy(keys.data(), keys_d, sizeof(key_t) * max_keys, D2H); CUERR + counting_benchmark( + keys_d, max_keys, {max_keys}, {0.9}, print_headers); - cudaFree(keys_d); CUERR - } - - counting_benchmark(keys, 0.9, dev_id); + cudaFree(keys_d); CUERR } \ No newline at end of file diff --git a/benchmark/src/multi_bucket_benchmark.cu b/benchmark/src/multi_bucket_benchmark.cu new file mode 100644 index 0000000..3c4f8a1 --- /dev/null +++ b/benchmark/src/multi_bucket_benchmark.cu @@ -0,0 +1,186 @@ +#include "common.cuh" +#include "warpcore.cuh" +#include "../../ext/hpc_helpers/include/io_helpers.h" +#include +#include +#include + +template +HOSTQUALIFIER INLINEQUALIFIER +void multi_value_benchmark( + const typename HashTable::key_type * keys_d, + const uint64_t max_keys, + std::vector input_sizes, + std::vector load_factors, + bool print_headers = true, + uint8_t iters = 5, + std::chrono::milliseconds thermal_backoff = std::chrono::milliseconds(100)) +{ + using index_t = typename HashTable::index_type; + using key_t = typename HashTable::key_type; + using value_t = typename HashTable::value_type; + + const uint64_t max_unique_size = num_unique(keys_d, max_keys); + + key_t* query_keys_d = nullptr; + cudaMalloc(&query_keys_d, sizeof(key_t)*max_keys); CUERR + value_t* values_d = nullptr; + cudaMalloc(&values_d, sizeof(value_t)*max_keys); CUERR + index_t * offsets_d = nullptr; + cudaMalloc(&offsets_d, sizeof(index_t)*(max_keys+1)); CUERR + + cudaMemset(values_d, 1, sizeof(value_t)*max_keys); CUERR + + const auto max_input_size = + *std::max_element(input_sizes.begin(), input_sizes.end()); + const auto min_load_factor = + *std::min_element(load_factors.begin(), load_factors.end()); + + if(max_input_size > max_keys) + { + std::cerr << "Maximum input size exceeded." << std::endl; + exit(1); + } + + const float bucket_factor = + float(sizeof(key_t) + sizeof(value_t)) / + (sizeof(key_t) + sizeof(value_t)*HashTable::bucket_size()); + + if(!sufficient_memory_oa(max_input_size * bucket_factor / min_load_factor)) + { + std::cerr << "Not enough GPU memory." << std::endl; + exit(1); + } + + for(auto size : input_sizes) + { + for(auto load : load_factors) + { + const std::uint64_t capacity = size * bucket_factor / load; + + HashTable hash_table(capacity); + + Output output; + output.sample_size = size; + output.key_capacity = hash_table.capacity(); + output.value_capacity = hash_table.value_capacity(); + + output.insert_ms = benchmark_insert( + hash_table, keys_d, values_d, size, + iters, thermal_backoff); + + // std::cerr << "keys in table: " << hash_table.num_keys() << '\n'; + + // auto key_set = hash_table.get_key_set(); + // std::cerr << "keys in set: " << key_set.size() << '\n'; + + output.query_ms = benchmark_query_multi( + hash_table, query_keys_d, size, + offsets_d, values_d, + iters, thermal_backoff); + + // output.query_ms = benchmark_query_unique( + // hash_table, query_keys_d, offsets_d, values_d, + // iters, thermal_backoff); + + output.key_load_factor = hash_table.key_load_factor(); + output.value_load_factor = hash_table.value_load_factor(); + output.density = hash_table.storage_density(); + output.relative_density = hash_table.relative_storage_density(); + output.status = hash_table.pop_status(); + + if(print_headers) + output.print_with_headers(); + else + output.print_without_headers(); + } + } + + cudaFree(query_keys_d); CUERR + cudaFree(values_d); CUERR + cudaFree(offsets_d); CUERR +} + +int main(int argc, char* argv[]) +{ + using namespace warpcore; + + using key_t = std::uint32_t; + using value_t = std::uint32_t; + + const uint64_t max_keys = 1UL << 27; + + const bool print_headers = true; + + uint64_t dev_id = 0; + if(argc > 2) dev_id = std::atoi(argv[2]); + cudaSetDevice(dev_id); CUERR + + key_t * keys_d = nullptr; + if(argc > 1) + keys_d = load_keys(argv[1], max_keys); + else + keys_d = generate_keys(max_keys, 8); + + using mb1_hash_table_t = MultiBucketHashTable< + key_t, + value_t, + defaults::empty_key(), + defaults::tombstone_key(), + defaults::empty_key(), + defaults::probing_scheme_t, + storage::key_value::AoSStore>>; + + using mb2_hash_table_t = MultiBucketHashTable< + key_t, + value_t, + defaults::empty_key(), + defaults::tombstone_key(), + defaults::empty_key(), + defaults::probing_scheme_t, + storage::key_value::AoSStore>>; + + using mb4_hash_table_t = MultiBucketHashTable< + key_t, + value_t, + defaults::empty_key(), + defaults::tombstone_key(), + defaults::empty_key(), + defaults::probing_scheme_t, + storage::key_value::AoSStore>>; + + using mb8_hash_table_t = MultiBucketHashTable< + key_t, + value_t, + defaults::empty_key(), + defaults::tombstone_key(), + defaults::empty_key(), + defaults::probing_scheme_t, + storage::key_value::AoSStore>>; + + multi_value_benchmark( + keys_d, max_keys, + {max_keys}, + {0.8}, + print_headers); + + multi_value_benchmark( + keys_d, max_keys, + {max_keys}, + {0.8}, + print_headers); + + multi_value_benchmark( + keys_d, max_keys, + {max_keys}, + {0.8}, + print_headers); + + multi_value_benchmark( + keys_d, max_keys, + {max_keys}, + {0.8}, + print_headers); + + cudaFree(keys_d); CUERR +} diff --git a/benchmark/src/multi_value_benchmark.cu b/benchmark/src/multi_value_benchmark.cu index 4b70994..32a7022 100755 --- a/benchmark/src/multi_value_benchmark.cu +++ b/benchmark/src/multi_value_benchmark.cu @@ -1,208 +1,92 @@ -#include -#include -#include -#include -#include -#include +#include "common.cuh" #include "warpcore.cuh" #include "../../ext/hpc_helpers/include/io_helpers.h" +#include +#include +#include -template -bool sufficient_memory(size_t size, float load, float headroom_factor = 1.1) -{ - const size_t capacity = size/load; - const size_t key_val_bytes = sizeof(Key)+sizeof(Value); - const size_t table_bytes = key_val_bytes*capacity; - const size_t io_bytes = key_val_bytes*size; - const size_t total_bytes = (table_bytes+io_bytes)*headroom_factor; - - size_t bytes_free, bytes_total; - cudaMemGetInfo(&bytes_free, &bytes_total); CUERR - - return (total_bytes <= bytes_free); -} - -uint64_t memory_partition(float factor = 0.4) -{ - size_t bytes_free, bytes_total; - cudaMemGetInfo(&bytes_free, &bytes_total); CUERR - - return bytes_free * factor; -} - -template -uint64_t num_unique(const std::vector& v) noexcept -{ - T * keys_d = nullptr; - cudaMalloc(&keys_d, sizeof(T) * v.size()); CUERR - cudaMemcpy(keys_d, v.data(), sizeof(T) * v.size(), H2D); CUERR - - auto set = warpcore::HashSet(v.size()); - - set.insert(keys_d, v.size()); - - cudaFree(keys_d); - - return set.size(); -} - -template +template HOSTQUALIFIER INLINEQUALIFIER void multi_value_benchmark( - const std::vector& keys, - std::vector input_sizes = {(1UL<<27)}, - std::vector load_factors = {0.8}, - uint64_t dev_id = 0, + const typename HashTable::key_type * keys_d, + const uint64_t max_keys, + std::vector input_sizes, + std::vector load_factors, bool print_headers = true, uint8_t iters = 5, std::chrono::milliseconds thermal_backoff = std::chrono::milliseconds(100)) { - cudaSetDevice(dev_id); CUERR - using index_t = typename HashTable::index_type; using key_t = typename HashTable::key_type; using value_t = typename HashTable::value_type; + const uint64_t max_unique_size = num_unique(keys_d, max_keys); + + key_t* query_keys_d = nullptr; + cudaMalloc(&query_keys_d, sizeof(key_t)*max_keys); CUERR + value_t* values_d = nullptr; + cudaMalloc(&values_d, sizeof(value_t)*max_keys); CUERR + index_t * offsets_d = nullptr; + cudaMalloc(&offsets_d, sizeof(index_t)*(max_keys+1)); CUERR + + cudaMemset(values_d, 1, sizeof(value_t)*max_keys); CUERR + const auto max_input_size = *std::max_element(input_sizes.begin(), input_sizes.end()); const auto min_load_factor = *std::min_element(load_factors.begin(), load_factors.end()); - if(max_input_size > keys.size()) + if(max_input_size > max_keys) { std::cerr << "Maximum input size exceeded." << std::endl; exit(1); } - if(!sufficient_memory(max_input_size, min_load_factor)) + if(!sufficient_memory_oa(max_input_size / min_load_factor)) { std::cerr << "Not enough GPU memory." << std::endl; exit(1); } - key_t* keys_d = nullptr; - cudaMalloc(&keys_d, sizeof(key_t)*max_input_size); CUERR - key_t* unique_keys_d = nullptr; - cudaMalloc(&unique_keys_d, sizeof(key_t)*max_input_size); CUERR - value_t* values_d = nullptr; - cudaMalloc(&values_d, sizeof(value_t)*max_input_size); CUERR - index_t * offsets_d = nullptr; - cudaMalloc(&offsets_d, sizeof(index_t)*(max_input_size+1)); CUERR - - cudaMemcpy(keys_d, keys.data(), sizeof(key_t)*max_input_size, H2D); CUERR - for(auto size : input_sizes) { for(auto load : load_factors) { - const std::uint64_t capacity = size / load; + const std::uint64_t capacity = float(size) / BucketSize / load; HashTable hash_table(capacity); - std::vector insert_times(iters); - for(uint64_t i = 0; i < iters; i++) - { - hash_table.init(); - cudaEvent_t insert_start, insert_stop; - float t; - cudaEventCreate(&insert_start); - cudaEventCreate(&insert_stop); - cudaEventRecord(insert_start, 0); - hash_table.insert(keys_d, values_d, size); - cudaEventRecord(insert_stop, 0); - cudaEventSynchronize(insert_stop); - cudaEventElapsedTime(&t, insert_start, insert_stop); - cudaDeviceSynchronize(); CUERR - insert_times[i] = t; - std::this_thread::sleep_for (thermal_backoff); - } - const float insert_time = - *std::min_element(insert_times.begin(), insert_times.end()); + Output output; + output.sample_size = size; + output.key_capacity = hash_table.capacity(); - index_t key_size_out = 0; - index_t value_size_out = 0; + output.insert_ms = benchmark_insert( + hash_table, keys_d, values_d, size, + iters, thermal_backoff); - hash_table.retrieve_all_keys(unique_keys_d, key_size_out); + output.query_ms = benchmark_query_multi( + hash_table, query_keys_d, size, + offsets_d, values_d, + iters, thermal_backoff); - std::vector query_times(iters); - for(uint64_t i = 0; i < iters; i++) - { - cudaEvent_t query_start, query_stop; - float t; - cudaEventCreate(&query_start); - cudaEventCreate(&query_stop); - cudaEventRecord(query_start, 0); - hash_table.retrieve( - unique_keys_d, - key_size_out, - offsets_d, - offsets_d+1, - values_d, - value_size_out); - cudaEventRecord(query_stop, 0); - cudaEventSynchronize(query_stop); - cudaEventElapsedTime(&t, query_start, query_stop); - cudaDeviceSynchronize(); CUERR - query_times[i] = t; - std::this_thread::sleep_for(thermal_backoff); - } - const float query_time = - *std::min_element(query_times.begin(), query_times.end()); + // output.query_ms = benchmark_query_unique( + // hash_table, query_keys_d, offsets_d, values_d, + // iters, thermal_backoff); - const uint64_t total_bytes = (sizeof(key_t) + sizeof(value_t))*size; - uint64_t ips = size/(insert_time/1000); - uint64_t qps = size/(query_time/1000); - float itp = helpers::B2GB(total_bytes) / (insert_time/1000); - float qtp = helpers::B2GB(total_bytes) / (query_time/1000); - float actual_load = hash_table.load_factor(); - warpcore::Status status = hash_table.pop_status(); + output.key_load_factor = hash_table.load_factor(); + output.density = output.key_load_factor; + output.status = hash_table.pop_status(); if(print_headers) - { - const char d = ' '; - - std::cout << "N=" << size << std::fixed - << d << "C=" << capacity - << d << "bits_key=" << sizeof(key_t)*CHAR_BIT - << d << "bits_value=" << sizeof(value_t)*CHAR_BIT - << d << "mb_keys=" << uint64_t(helpers::B2MB(sizeof(key_t)*size)) - << d << "mb_values=" << uint64_t(helpers::B2MB(sizeof(value_t)*size)) - << d << "load=" << actual_load - << d << "density=" << actual_load - << d << "insert_ms=" << insert_time - << d << "query_ms=" << query_time - << d << "IPS=" << ips - << d << "QPS=" << qps - << d << "insert_GB/s=" << itp - << d << "query_GB/s=" << qtp - << d << "status=" << status << std::endl; - } + output.print_with_headers(); else - { - const char d = ' '; - - std::cout << std::fixed - << size - << d << capacity - << d << sizeof(key_t)*CHAR_BIT - << d << sizeof(value_t)*CHAR_BIT - << d << uint64_t(helpers::B2MB(sizeof(key_t)*size)) - << d << uint64_t(helpers::B2MB(sizeof(value_t)*size)) - << d << actual_load - << d << actual_load - << d << insert_time - << d << query_time - << d << ips - << d << qps - << d << itp - << d << qtp - << d << status << std::endl; - } + output.print_without_headers(); } } - cudaFree(keys_d); CUERR + cudaFree(query_keys_d); CUERR cudaFree(values_d); CUERR + cudaFree(offsets_d); CUERR } int main(int argc, char* argv[]) @@ -212,52 +96,48 @@ int main(int argc, char* argv[]) using key_t = std::uint32_t; using value_t = std::uint32_t; - using hash_table_t = MultiValueHashTable< - key_t, - value_t, - defaults::empty_key(), - defaults::tombstone_key(), - defaults::probing_scheme_t, - storage::key_value::AoSStore>; - const uint64_t max_keys = 1UL << 27; - uint64_t dev_id = 0; - std::vector keys; + const bool print_headers = true; + + uint64_t dev_id = 0; if(argc > 2) dev_id = std::atoi(argv[2]); + cudaSetDevice(dev_id); CUERR + key_t * keys_d = nullptr; if(argc > 1) - { - keys = helpers::load_binary(argv[1], max_keys); - } + keys_d = load_keys(argv[1], max_keys); else - { - keys.resize(max_keys); - - key_t * keys_d = nullptr; - cudaMalloc(&keys_d, sizeof(key_t) * max_keys); CUERR - - helpers::lambda_kernel - <<>> - ([=] DEVICEQUALIFIER - { - const uint64_t tid = blockDim.x * blockIdx.x + threadIdx.x; - - if(tid < max_keys) - { - keys_d[tid] = (tid % (max_keys / 8)) + 1; - } - }); + keys_d = generate_keys(max_keys, 8); - cudaMemcpy(keys.data(), keys_d, sizeof(key_t) * max_keys, D2H); CUERR - - cudaFree(keys_d); CUERR - } + using mv_hash_table_t = MultiValueHashTable< + key_t, + value_t, + defaults::empty_key(), + defaults::tombstone_key(), + defaults::probing_scheme_t, + storage::key_value::AoSStore>; - multi_value_benchmark( - keys, + // using mb_hash_table_t = MultiBucketHashTable< + // key_t, + // value_t, + // defaults::empty_key(), + // defaults::tombstone_key(), + // defaults::empty_key(), + // defaults::probing_scheme_t, + // storage::key_value::AoSStore>>; + + multi_value_benchmark( + keys_d, max_keys, {max_keys}, {0.8}, - dev_id); + print_headers); + // multi_value_benchmark( + // keys_d, max_keys, + // {max_keys}, + // {0.8}, + // print_headers); + + cudaFree(keys_d); CUERR } \ No newline at end of file diff --git a/benchmark/src/single_value_benchmark.cu b/benchmark/src/single_value_benchmark.cu index 9777ee3..6351a1d 100644 --- a/benchmark/src/single_value_benchmark.cu +++ b/benchmark/src/single_value_benchmark.cu @@ -1,66 +1,44 @@ -#include -#include -#include -#include -#include -#include +#include "common.cuh" #include "warpcore.cuh" #include "../../ext/hpc_helpers/include/io_helpers.h" - -template -bool sufficient_memory(size_t size, float load, float headroom_factor = 1.1) -{ - const size_t capacity = size/load; - const size_t key_val_bytes = sizeof(Key)+sizeof(Value); - const size_t table_bytes = key_val_bytes*capacity; - const size_t io_bytes = key_val_bytes*size; - const size_t total_bytes = (table_bytes+io_bytes)*headroom_factor; - - size_t bytes_free, bytes_total; - cudaMemGetInfo(&bytes_free, &bytes_total); CUERR - - return (total_bytes <= bytes_free); -} +#include +#include +#include template HOSTQUALIFIER INLINEQUALIFIER void single_value_benchmark( - const std::vector& keys, - uint64_t dev_id = 0, + const typename HashTable::key_type * keys_d, + const uint64_t max_keys, + std::vector input_sizes, + std::vector load_factors, bool print_headers = true, - std::vector input_sizes = {(1UL<<27)}, - std::vector load_factors = {0.8}, uint8_t iters = 5, std::chrono::milliseconds thermal_backoff = std::chrono::milliseconds(100)) { - cudaSetDevice(dev_id); CUERR - using key_t = typename HashTable::key_type; using value_t = typename HashTable::value_type; + value_t* values_d = nullptr; + cudaMalloc(&values_d, sizeof(value_t)*max_keys); CUERR + const auto max_input_size = *std::max_element(input_sizes.begin(), input_sizes.end()); const auto min_load_factor = *std::min_element(load_factors.begin(), load_factors.end()); - if(max_input_size > keys.size()) + if(max_input_size > max_keys) { std::cerr << "Maximum input size exceeded." << std::endl; exit(1); } - if(!sufficient_memory(max_input_size, min_load_factor)) + if(!sufficient_memory_oa(max_input_size / min_load_factor)) { std::cerr << "Not enough GPU memory." << std::endl; exit(1); } - key_t* keys_d = nullptr; - cudaMalloc(&keys_d, sizeof(key_t)*max_input_size); CUERR - value_t* values_d = nullptr; - cudaMalloc(&values_d, sizeof(value_t)*max_input_size); CUERR - cudaMemcpy(keys_d, keys.data(), sizeof(key_t)*max_input_size, H2D); CUERR - for(auto size : input_sizes) { for(auto load : load_factors) @@ -69,96 +47,29 @@ void single_value_benchmark( HashTable hash_table(capacity); - std::vector insert_times(iters); - for(uint64_t i = 0; i < iters; i++) - { - hash_table.init(); - cudaEvent_t insert_start, insert_stop; - float t; - cudaEventCreate(&insert_start); - cudaEventCreate(&insert_stop); - cudaEventRecord(insert_start, 0); - hash_table.insert(keys_d, values_d, size); - cudaEventRecord(insert_stop, 0); - cudaEventSynchronize(insert_stop); - cudaEventElapsedTime(&t, insert_start, insert_stop); - cudaDeviceSynchronize(); CUERR - insert_times[i] = t; - std::this_thread::sleep_for (thermal_backoff); - } - const float insert_time = - *std::min_element(insert_times.begin(), insert_times.end()); - - std::vector query_times(iters); - for(uint64_t i = 0; i < iters; i++) - { - cudaEvent_t query_start, query_stop; - float t; - cudaEventCreate(&query_start); - cudaEventCreate(&query_stop); - cudaEventRecord(query_start, 0); - hash_table.retrieve(keys_d, size, values_d); - cudaEventRecord(query_stop, 0); - cudaEventSynchronize(query_stop); - cudaEventElapsedTime(&t, query_start, query_stop); - cudaDeviceSynchronize(); CUERR - query_times[i] = t; - std::this_thread::sleep_for(thermal_backoff); - } - const float query_time = - *std::min_element(query_times.begin(), query_times.end()); - - const uint64_t total_bytes = (sizeof(key_t) + sizeof(value_t))*size; - uint64_t ips = size/(insert_time/1000); - uint64_t qps = size/(query_time/1000); - float itp = helpers::B2GB(total_bytes) / (insert_time/1000); - float qtp = helpers::B2GB(total_bytes) / (query_time/1000); - float actual_load = hash_table.load_factor(); - warpcore::Status status = hash_table.pop_status(); + Output output; + output.sample_size = size; + output.key_capacity = hash_table.capacity(); + + output.insert_ms = benchmark_insert( + hash_table, keys_d, values_d, size, + iters, thermal_backoff); + + output.query_ms = benchmark_query( + hash_table, keys_d, values_d, size, + iters, thermal_backoff); + + output.key_load_factor = hash_table.load_factor(); + output.density = output.key_load_factor; + output.status = hash_table.pop_status(); if(print_headers) - { - const char d = ' '; - - std::cout << "N=" << size << std::fixed - << d << "C=" << capacity - << d << "bits_key=" << sizeof(key_t)*CHAR_BIT - << d << "bits_value=" << sizeof(value_t)*CHAR_BIT - << d << "mb_keys=" << uint64_t(helpers::B2MB(sizeof(key_t)*size)) - << d << "mb_values=" << uint64_t(helpers::B2MB(sizeof(value_t)*size)) - << d << "load=" << actual_load - << d << "insert_ms=" << insert_time - << d << "query_ms=" << query_time - << d << "IPS=" << ips - << d << "QPS=" << qps - << d << "insert_GB/s=" << itp - << d << "query_GB/s=" << qtp - << d << "status=" << status << std::endl; - } + output.print_with_headers(); else - { - const char d = ' '; - - std::cout << std::fixed - << size - << d << capacity - << d << sizeof(key_t)*CHAR_BIT - << d << sizeof(value_t)*CHAR_BIT - << d << uint64_t(helpers::B2MB(sizeof(key_t)*size)) - << d << uint64_t(helpers::B2MB(sizeof(value_t)*size)) - << d << actual_load - << d << insert_time - << d << query_time - << d << ips - << d << qps - << d << itp - << d << qtp - << d << status << std::endl; - } + output.print_without_headers(); } } - cudaFree(keys_d); CUERR cudaFree(values_d); CUERR } @@ -169,47 +80,30 @@ int main(int argc, char* argv[]) using key_t = std::uint32_t; using value_t = std::uint32_t; - using hash_table_t = SingleValueHashTable< - key_t, - value_t, - defaults::empty_key(), - defaults::tombstone_key(), - defaults::probing_scheme_t, - storage::key_value::AoSStore>; + const uint64_t max_keys = 1UL << 27; - const uint64_t max_keys = 1UL << 28; - uint64_t dev_id = 0; - std::vector keys; + const bool print_headers = true; + uint64_t dev_id = 0; if(argc > 2) dev_id = std::atoi(argv[2]); + cudaSetDevice(dev_id); CUERR + key_t * keys_d = nullptr; if(argc > 1) - { - keys = helpers::load_binary(argv[1], max_keys); - } + keys_d = load_keys(argv[1], max_keys); else - { - keys.resize(max_keys); - - key_t * keys_d = nullptr; - cudaMalloc(&keys_d, sizeof(key_t) * max_keys); CUERR - - helpers::lambda_kernel - <<>> - ([=] DEVICEQUALIFIER - { - const uint64_t tid = blockDim.x * blockIdx.x + threadIdx.x; - - if(tid < max_keys) - { - keys_d[tid] = tid + 1; - } - }); + keys_d = generate_keys(max_keys, 1); - cudaMemcpy(keys.data(), keys_d, sizeof(key_t) * max_keys, D2H); CUERR + using hash_table_t = SingleValueHashTable< + key_t, + value_t, + defaults::empty_key(), + defaults::tombstone_key(), + defaults::probing_scheme_t, + storage::key_value::AoSStore>; - cudaFree(keys_d); CUERR - } + single_value_benchmark( + keys_d, max_keys, {max_keys}, {0.8}, print_headers); - single_value_benchmark(keys, dev_id, true); -} \ No newline at end of file + cudaFree(keys_d); CUERR +} diff --git a/examples/Makefile b/examples/Makefile index b42d6e2..48de67e 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -7,7 +7,9 @@ OPT := 3 CCFLAGS := -O$(OPT) -std=$(STD) -Wall -Wextra -fopenmp -DNDEBUG XCCFLAGS := $(addprefix -Xcompiler ,$(CCFLAGS)) NVCCGENCODE = -gencode arch=compute_70,code=sm_70 -NVCCFLAGS := -O$(OPT) -std=$(STD) -ccbin $(CC) $(XCCFLAGS) $(NVCCGENCODE) --expt-extended-lambda +NVCCFLAGS := -O$(OPT) -std=$(STD) -ccbin $(CC) $(XCCFLAGS) $(NVCCGENCODE) --expt-extended-lambda --expt-relaxed-constexpr + +NVCCFLAGS += -lineinfo -g -DNDEBUG #profile INCS := $(foreach dir, $(INCDIRS), $(wildcard $(dir)/*.cuh $(dir)/*.h $(dir)/*.hpp)) INCPARAMS := $(addprefix -I, $(INCDIRS)) @@ -23,6 +25,9 @@ advanced_usage_from_device: ${INCS} | bin multi_value_hash_table: ${INCS} | bin $(NVCC) $(NVCCFLAGS) $(INCPARAMS) src/multi_value_hash_table.cu -o bin/multi_value_hash_table.out +multi_bucket_hash_table: ${INCS} | bin + $(NVCC) $(NVCCFLAGS) $(INCPARAMS) src/multi_bucket_hash_table.cu -o bin/multi_bucket_hash_table.out + bucket_list_hash_table: ${INCS} | bin $(NVCC) $(NVCCFLAGS) $(INCPARAMS) src/bucket_list_hash_table.cu -o bin/bucket_list_hash_table.out diff --git a/examples/src/multi_bucket_hash_table.cu b/examples/src/multi_bucket_hash_table.cu new file mode 100644 index 0000000..445097c --- /dev/null +++ b/examples/src/multi_bucket_hash_table.cu @@ -0,0 +1,250 @@ +#include +#include +#include +#include +#include "../../ext/hpc_helpers/include/timers.cuh" + +int main () +{ + using namespace warpcore; + + using key_t = std::uint32_t; + using value_t = std::uint64_t; + + using hash_table_t = MultiBucketHashTable< + key_t, + value_t, + defaults::empty_key(), + defaults::tombstone_key(), + defaults::empty_key(), + defaults::probing_scheme_t, + storage::key_value::AoSStore>>; + using status_t = typename hash_table_t::status_type; + using status_handler_t = typename status_handlers::ReturnStatus; + + const index_t size_unique_keys = 1UL << 5; + const index_t size_values_per_key = 1UL << 15; + const index_t max_values_per_key = 21; + const index_t size = size_unique_keys * size_values_per_key; + const index_t max_buckets = size_unique_keys * SDIV(max_values_per_key,4); + const float load_factor = 0.95; + + helpers::GpuTimer timer("init_table"); + hash_table_t hash_table(max_buckets / load_factor, defaults::seed(), max_values_per_key); + timer.print(); + cudaDeviceSynchronize(); CUERR + + helpers::GpuTimer timer2("init_data"); + key_t * keys_unique_h = nullptr; + cudaMallocHost(&keys_unique_h, sizeof(key_t) * size_unique_keys); CUERR + key_t * keys_unique_d = nullptr; + cudaMalloc(&keys_unique_d, sizeof(key_t) * size_unique_keys); CUERR + + key_t * keys_in_h = nullptr; + cudaMallocHost(&keys_in_h, sizeof(key_t) * size); CUERR + key_t * keys_in_d = nullptr; + cudaMalloc(&keys_in_d, sizeof(key_t) * size); CUERR + + value_t * values_in_h = nullptr; + cudaMallocHost(&values_in_h, sizeof(value_t) * size); CUERR + value_t * values_in_d = nullptr; + cudaMalloc(&values_in_d, sizeof(value_t) * size); CUERR + + index_t * offsets_out_d = nullptr; + cudaMalloc(&offsets_out_d, sizeof(index_t) * (size_unique_keys+1)); CUERR + + value_t * values_out_h = nullptr; + cudaMallocHost(&values_out_h, sizeof(value_t) * size); CUERR + value_t * values_out_d = nullptr; + cudaMalloc(&values_out_d, sizeof(value_t) * size); CUERR + + status_t * status_h = nullptr; + cudaMallocHost(&status_h, sizeof(status_t) * size); CUERR + status_t * status_d = nullptr; + cudaMalloc(&status_d, sizeof(status_t) * size); CUERR + + #pragma omp parallel for + for(index_t i = 0; i < size_unique_keys; ++i) + { + keys_unique_h[i] = i + 1; + + for(index_t j = 0; j < size_values_per_key; ++j) + { + keys_in_h[i * size_values_per_key + j] = i + 1; + } + } + + std::random_device rd; + std::mt19937 g(rd()); + + std::shuffle(keys_in_h, keys_in_h + size, g); + + #pragma omp parallel for + for(index_t i= 0; i < size; ++i) + { + values_in_h[i] = keys_in_h[i]; + status_h[i] = status_t::none(); + } + + cudaMemcpy(keys_in_d, keys_in_h, sizeof(key_t)*size, H2D); CUERR + cudaMemcpy(values_in_d, values_in_h, sizeof(value_t)*size, H2D); CUERR + cudaMemcpy(status_d, status_h, sizeof(status_t)*size, H2D); CUERR + + cudaMemset(values_out_d, 0, sizeof(value_t)*size); CUERR + cudaMemset(offsets_out_d, 0, sizeof(index_t)*(size_unique_keys+1)); CUERR + timer2.print(); + CUERR + + const size_t batch_size = 1UL << 15; + const size_t full_batches = size / batch_size; + const size_t last_batch_size = size % batch_size; + + helpers::GpuTimer timer3("insert"); + for(size_t b=0; b( + keys_in_d + b*batch_size, + values_in_d + b*batch_size, + batch_size, + 0, + defaults::probing_length(), + status_d + b*batch_size); + } + if(last_batch_size) + { + hash_table.insert( + keys_in_d + full_batches*batch_size, + values_in_d + full_batches*batch_size, + last_batch_size, + 0, + defaults::probing_length(), + status_d + full_batches*batch_size); + } + timer3.print_throughput((sizeof(key_t)+sizeof(value_t)), size); + cudaDeviceSynchronize(); CUERR + + cudaMemcpy(status_h, status_d, sizeof(status_t)*size, D2H); CUERR + + std::cout << "table status " << hash_table.peek_status() << std::endl; + index_t errors = 0; + for(index_t i = 0; i < size; ++i) + { + status_h[i] -= status_t::duplicate_key(); + + if(status_h[i].has_any_errors()) + { + if(errors++ < 10) + std::cout << "STATUS: i " << i << " key " << keys_in_h[i] << " status " << status_h[i] << std::endl; + } + } + if(errors >= 10) + { + std::cout << "...\n" << "total errors " << errors << std::endl; + } + + std::cout << "num pairs " << size << std::endl; + std::cout << "table size " << hash_table.size() << std::endl; + std::cout << "key capacity " << hash_table.capacity() << std::endl; + std::cout << "load factor " << hash_table.key_load_factor() << std::endl; + std::cout << "expected unique keys " << size_unique_keys << std::endl; + std::cout << "actual unique keys " << hash_table.num_keys() << std::endl; + std::cout << "values per key " << size_values_per_key << std::endl; + std::cout << "total values " << size << std::endl; + std::cout << "capped values " << size_unique_keys*max_values_per_key << std::endl; + + #pragma omp parallel for + for(index_t i= 0; i < size; ++i) + { + status_h[i] = status_t::none(); + } + + cudaMemcpy(status_d, status_h, sizeof(status_t)*size, H2D); CUERR + + index_t value_size = 0; + + index_t count_unique_keys = 0; + hash_table.retrieve_all_keys( + keys_unique_d, + count_unique_keys); + cudaDeviceSynchronize(); CUERR + + std::cout << "retrieved keys " << count_unique_keys << std::endl; + + { + helpers::GpuTimer timer("retrieve"); + hash_table.retrieve( + keys_unique_d, + count_unique_keys, + offsets_out_d, + offsets_out_d+1, + values_out_d, + value_size, + 0, + defaults::probing_length(), + status_d); + timer.print_throughput((sizeof(key_t)+sizeof(value_t)), size); + } + cudaDeviceSynchronize(); CUERR + + std::cout << "retrieved values " << value_size << std::endl; + + helpers::lambda_kernel<<>>([=] DEVICEQUALIFIER + { + const index_t tid = blockDim.x * blockIdx.x + threadIdx.x; + + if(tid < size_unique_keys) + { + const auto key = keys_unique_d[tid]; + const auto lower = offsets_out_d[tid]; + const auto upper = offsets_out_d[tid + 1]; + + if(upper - lower != max_values_per_key) + { + printf("ERROR size values %llu (expected %llu)\n", upper - lower, max_values_per_key); + } + + for (index_t i = lower; i < upper; i++) + { + if(values_out_d[i] != key) + { + printf("ERROR expected %u got %llu\n", key, values_out_d[i]); + } + } + } + }); + + cudaDeviceSynchronize(); CUERR + + cudaMemcpy(status_h, status_d, sizeof(status_t)*size, D2H); CUERR + + std::cout << "table status " << hash_table.peek_status() << std::endl; + + errors = 0; + for(index_t i = 0; i < size; ++i) + { + + if(status_h[i].has_any()) + { + if(errors++ < 10) + std::cout << "STATUS: i " << i << " key " << keys_in_h[i] << " status " << status_h[i] << std::endl; + } + } + if(errors >= 10) + { + std::cout << "...\n" << "total errors " << errors << std::endl; + } + + cudaFreeHost(keys_unique_h); + cudaFreeHost(keys_in_h); + cudaFreeHost(values_in_h); + cudaFreeHost(values_out_h); + cudaFreeHost(status_h); + cudaFree(keys_unique_d); + cudaFree(keys_in_d); + cudaFree(values_in_d); + cudaFree(offsets_out_d); + cudaFree(values_out_d); + cudaFree(status_d); + + cudaDeviceSynchronize(); CUERR +} diff --git a/ext/hpc_helpers b/ext/hpc_helpers index d2c1f8b..34ebf68 160000 --- a/ext/hpc_helpers +++ b/ext/hpc_helpers @@ -1 +1 @@ -Subproject commit d2c1f8b6daea6425c96816869daf50e247a3dd64 +Subproject commit 34ebf68a22515a473ad9d73d63050f75c7bbbfd2 diff --git a/include/bloom_filter.cuh b/include/bloom_filter.cuh index c3aed74..5c1b740 100644 --- a/include/bloom_filter.cuh +++ b/include/bloom_filter.cuh @@ -260,14 +260,14 @@ public: * \param[in] group cooperative group this operation is executed in * \param[out] flag whether the key was already inside the filter before insertion */ - template< + template< index_type CGSize_ = cg_size(), class = std::enable_if_t> - DEVICEQUALIFIER INLINEQUALIFIER - bool insert_and_query( + DEVICEQUALIFIER INLINEQUALIFIER + bool insert_and_query( const key_type key, const cg::thread_block_tile& group) noexcept - { + { const index_type slot_index = ((Hasher::hash(key+seed_) % num_blocks_) * cg_size() + group.thread_rank()) % num_slots_; @@ -294,7 +294,7 @@ public: { return true; } - } + } /*! \brief get number of bits (m) * \return number of bits (m) @@ -317,11 +317,11 @@ public: /*! \brief get number of blocks * \return number of blocks */ - HOSTDEVICEQUALIFIER INLINEQUALIFIER - index_type num_blocks() const noexcept - { - return num_blocks_; - } + HOSTDEVICEQUALIFIER INLINEQUALIFIER + index_type num_blocks() const noexcept + { + return num_blocks_; + } /*! \brief get number of hash functions (k) * \return number of hash functions (k) diff --git a/include/bucket_list_hash_table.cuh b/include/bucket_list_hash_table.cuh index a8ccaa2..f558bf6 100644 --- a/include/bucket_list_hash_table.cuh +++ b/include/bucket_list_hash_table.cuh @@ -50,20 +50,20 @@ public: /*! \brief get empty key * \return empty key */ - HOSTDEVICEQUALIFIER INLINEQUALIFIER - static constexpr key_type empty_key() noexcept - { - return EmptyKey; - } + HOSTDEVICEQUALIFIER INLINEQUALIFIER + static constexpr key_type empty_key() noexcept + { + return EmptyKey; + } /*! \brief get tombstone key * \return tombstone key */ - HOSTDEVICEQUALIFIER INLINEQUALIFIER - static constexpr key_type tombstone_key() noexcept - { - return TombstoneKey; - } + HOSTDEVICEQUALIFIER INLINEQUALIFIER + static constexpr key_type tombstone_key() noexcept + { + return TombstoneKey; + } /*! \brief checks if \c key is equal to \c (EmptyKey||TombstoneKey) * \return \c bool @@ -618,11 +618,11 @@ public: * \param stream CUDA stream in which this operation is executed in * \return load factor */ - HOSTQUALIFIER INLINEQUALIFIER - float value_load_factor(const cudaStream_t stream = 0) const noexcept - { - return value_store_.load_factor(stream); - } + HOSTQUALIFIER INLINEQUALIFIER + float value_load_factor(const cudaStream_t stream = 0) const noexcept + { + return value_store_.load_factor(stream); + } /*! \brief get the the total number of bytes occupied by this data structure * \return bytes diff --git a/include/counting_hash_table.cuh b/include/counting_hash_table.cuh index fa63b65..46aee02 100644 --- a/include/counting_hash_table.cuh +++ b/include/counting_hash_table.cuh @@ -182,7 +182,7 @@ public: template HOSTQUALIFIER INLINEQUALIFIER void insert( - key_type * keys_in, + const key_type * keys_in, index_type num_in, cudaStream_t stream = 0, index_type probing_length = defaults::probing_length(), @@ -236,7 +236,7 @@ public: template HOSTQUALIFIER INLINEQUALIFIER void retrieve( - key_type * keys_in, + const key_type * keys_in, index_type num_in, value_type * values_out, cudaStream_t stream = 0, diff --git a/include/gpu_engine.cuh b/include/gpu_engine.cuh index b033035..03a1e9c 100644 --- a/include/gpu_engine.cuh +++ b/include/gpu_engine.cuh @@ -45,6 +45,53 @@ void for_each( } } +template< + class Func, + class Core> +GLOBALQUALIFIER +void for_each_unique_key( + Func f, + const Core core) +{ + using index_type = typename Core::index_type; + using probing_scheme_type = typename Core::probing_scheme_type; + + const index_t tid = helpers::global_thread_id(); + const index_t gid = tid / Core::cg_size(); + const auto group = + cg::tiled_partition(cg::this_thread_block()); + + if(gid < core.capacity()) + { + // for valid entry in table check if this entry is the first of its key + auto search_key = core.table_[gid].key; + if(core.is_valid_key(search_key)) + { + probing_scheme_type iter(core.capacity(), core.capacity(), group); + + for(index_type i = iter.begin(search_key, core.seed_); i != iter.end(); i = iter.next()) + { + const auto table_key = core.table_[i].key; + const auto hit = (table_key == search_key); + const auto hit_mask = group.ballot(hit); + + const auto leader = ffs(hit_mask) - 1; + + // check if search_key is the first entry for this key + if(group.thread_rank() == leader && i == gid) + { + f(table_key); + } + + if(group.any(hit)) + { + return; + } + } + } + } +} + template< class Func, class Core, @@ -369,6 +416,53 @@ void size( } } +// for Core = MultiBucketHashTable +template +GLOBALQUALIFIER +void num_values( + index_t * const num_out, + const Core core) +{ + __shared__ index_t smem; + + const index_t tid = helpers::global_thread_id(); + const auto block = cg::this_thread_block(); + + if(tid < core.capacity()) + { + const bool empty = !core.is_valid_key(core.table_[tid].key); + + if(block.thread_rank() == 0) + { + smem = 0; + } + + block.sync(); + + index_t value_count = 0; + if(!empty) + { + const auto bucket = core.table_[tid].value; + #pragma unroll + for(int b = 0; b < core.bucket_size(); ++b) { + const auto& value = bucket[b]; + if(value != core.empty_value()) + ++value_count; + } + + // TODO warp reduce + atomicAdd(&smem, value_count); + } + + block.sync(); + + if(block.thread_rank() == 0 && smem != 0) + { + atomicAdd(num_out, smem); + } + } +} + template GLOBALQUALIFIER void num_values( diff --git a/include/hash_set.cuh b/include/hash_set.cuh index 8440f65..098b85f 100644 --- a/include/hash_set.cuh +++ b/include/hash_set.cuh @@ -264,7 +264,7 @@ public: template HOSTQUALIFIER INLINEQUALIFIER void insert( - key_type * keys_in, + const key_type * keys_in, index_type num_in, cudaStream_t stream = 0, index_type probing_length = defaults::probing_length(), @@ -340,7 +340,7 @@ public: template HOSTQUALIFIER INLINEQUALIFIER void retrieve( - key_type * keys_in, + const key_type * keys_in, index_type num_in, bool * flags_out, cudaStream_t stream = 0, @@ -467,31 +467,31 @@ public: * \param[in] stream CUDA stream in which this operation is executed in * \param[in] size of shared memory to reserve for this execution */ - template - HOSTQUALIFIER INLINEQUALIFIER - void for_each( - Func f, - cudaStream_t stream = 0, - index_type smem_bytes = 0) const noexcept - { - if(!is_initialized_) return; - - helpers::lambda_kernel - <<>> - ([=, *this] DEVICEQUALIFIER // TODO mutable? - { - const index_type tid = helpers::global_thread_id(); - - if(tid < capacity()) - { - const key_type key = keys_[tid]; - if(is_valid_key(key)) - { - f(key); - } - } - }); - } + template + HOSTQUALIFIER INLINEQUALIFIER + void for_each( + Func f, + cudaStream_t stream = 0, + index_type smem_bytes = 0) const noexcept + { + if(!is_initialized_) return; + + helpers::lambda_kernel + <<>> + ([=, *this] DEVICEQUALIFIER // TODO mutable? + { + const index_type tid = helpers::global_thread_id(); + + if(tid < capacity()) + { + const key_type key = keys_[tid]; + if(is_valid_key(key)) + { + f(key); + } + } + }); + } /*! \brief number of key/value pairs stored inside the hash set * \return the number of key/value pairs inside the hash table diff --git a/include/multi_bucket_hash_table.cuh b/include/multi_bucket_hash_table.cuh new file mode 100644 index 0000000..72c043b --- /dev/null +++ b/include/multi_bucket_hash_table.cuh @@ -0,0 +1,1366 @@ +#ifndef WARPCORE_MULTI_BUCKET_HASH_TABLE_CUH +#define WARPCORE_MULTI_BUCKET_HASH_TABLE_CUH + +#include "hash_set.cuh" + +namespace warpcore +{ + +template< + class Value, + std::uint32_t BucketSize = 1> +struct ArrayBucket { + using value_type = Value; + using index_type = std::uint32_t; + + static_assert( + BucketSize > 0, + "invalid bucket size of 0"); + + HOSTDEVICEQUALIFIER INLINEQUALIFIER + explicit ArrayBucket(value_type value) noexcept + { + #pragma unroll + for(index_type i = 0; i < bucket_size(); ++i) + values_[i] = value; + } + + HOSTDEVICEQUALIFIER INLINEQUALIFIER + ArrayBucket(const ArrayBucket& other) noexcept + { + #pragma unroll + for(index_type i = 0; i < bucket_size(); ++i) + values_[i] = other.values_[i]; + } + + HOSTDEVICEQUALIFIER INLINEQUALIFIER + ArrayBucket& operator =(const ArrayBucket& other) noexcept + { + #pragma unroll + for(index_type i = 0; i < bucket_size(); ++i) + values_[i] = other.values_[i]; + return *this; + } + + /*! \brief get bucket size + * \return bucket size + */ + HOSTDEVICEQUALIFIER INLINEQUALIFIER + static constexpr index_type bucket_size() noexcept + { + return BucketSize; + } + + /*! \brief accessor + * \param[in] i index to access + * \return value at position \c i + */ + DEVICEQUALIFIER INLINEQUALIFIER + constexpr value_type& operator[](const index_type i) noexcept + { + return values_[i]; + } + + /*! \brief const accessor + * \param[in] i index to access + * \return value at position \c i + */ + DEVICEQUALIFIER INLINEQUALIFIER + constexpr const value_type& operator[](const index_type i) const noexcept + { + return values_[i]; + } + + value_type values_[BucketSize]; +}; + + +/*! \brief multi-value hash table + * \tparam Key key type ( \c std::uint32_t or \c std::uint64_t ) + * \tparam Value value type + * \tparam EmptyKey key which represents an empty slot + * \tparam TombstoneKey key which represents an erased slot + * \tparam ProbingScheme probing scheme from \c warpcore::probing_schemes + * \tparam TableStorage memory layout from \c warpcore::storage::key_value + * \tparam TempMemoryBytes size of temporary storage (typically a few kB) + */ +template< + class Key, + class Value, + Key EmptyKey = defaults::empty_key(), + Key TombstoneKey = defaults::tombstone_key(), + Value EmptyValue = defaults::empty_key(), + class ProbingScheme = defaults::probing_scheme_t, + class TableStorage = defaults::table_storage_t>, + index_t TempMemoryBytes = defaults::temp_memory_bytes()> +class MultiBucketHashTable +{ + static_assert( + checks::is_valid_key_type(), + "invalid key type"); + + static_assert( + checks::is_valid_slot_type(), + "invalid value type"); + + static_assert( + EmptyKey != TombstoneKey, + "empty key and tombstone key must not be identical"); + + static_assert( + checks::is_cycle_free_probing_scheme(), + "not a valid probing scheme type"); + + static_assert( + std::is_same::value, + "probing key type differs from table's key type"); + + static_assert( + checks::is_key_value_storage(), + "not a valid storage type"); + + static_assert( + std::is_same::value, + "storage's key type differs from table's key type"); + + static_assert( + std::is_same::value, + "storage's value type differs from table's value type"); + + static_assert( + TempMemoryBytes >= sizeof(index_t), + "temporary storage must at least be of size index_type"); + + using temp_type = storage::CyclicStore; + +public: + using key_type = Key; + using value_type = Value; + using bucket_type = typename TableStorage::value_type; + using index_type = index_t; + using status_type = Status; + using probing_scheme_type = ProbingScheme; + + /*! \brief get empty key + * \return empty key + */ + HOSTDEVICEQUALIFIER INLINEQUALIFIER + static constexpr key_type empty_key() noexcept + { + return EmptyKey; + } + + /*! \brief get tombstone key + * \return tombstone key + */ + HOSTDEVICEQUALIFIER INLINEQUALIFIER + static constexpr key_type tombstone_key() noexcept + { + return TombstoneKey; + } + + /*! \brief get empty value + * \return empty value + */ + HOSTDEVICEQUALIFIER INLINEQUALIFIER + static constexpr value_type empty_value() noexcept + { + return EmptyValue; + } + + + /*! \brief get cooperative group size + * \return cooperative group size + */ + HOSTDEVICEQUALIFIER INLINEQUALIFIER + static constexpr index_type cg_size() noexcept + { + return ProbingScheme::cg_size(); + } + + /*! \brief get bucket size + * \return bucket size + */ + HOSTDEVICEQUALIFIER INLINEQUALIFIER + static constexpr index_type bucket_size() noexcept + { + return TableStorage::value_type::bucket_size(); + } + + /*! \brief constructor + * \param[in] min_capacity minimum number of slots in the hash table + * \param[in] seed random seed + * \param[in] max_values_per_key maximum number of values to store per key + * \param[in] no_init whether to initialize the table at construction or not + */ + HOSTQUALIFIER INLINEQUALIFIER + explicit MultiBucketHashTable( + const index_type min_capacity, + const key_type seed = defaults::seed(), + const index_type max_values_per_key = + std::numeric_limits::max(), + const bool no_init = false) noexcept : + status_(nullptr), + table_(detail::get_valid_capacity(min_capacity, cg_size())), + temp_(TempMemoryBytes / sizeof(index_type)), + seed_(seed), + max_values_per_key_(max_values_per_key), + num_keys_(nullptr), + num_occupied_(nullptr), + is_copy_(false), + is_initialized_(false) + { + cudaMalloc(&status_, sizeof(status_type)); + cudaMalloc(&num_keys_, sizeof(index_type)); + cudaMalloc(&num_occupied_, sizeof(index_type)); + + assign_status(table_.status() + temp_.status()); + + if(!no_init) init(); + } + + /*! \brief copy-constructor (shallow) + * \param[in] object to be copied + */ + HOSTDEVICEQUALIFIER INLINEQUALIFIER + MultiBucketHashTable(const MultiBucketHashTable& o) noexcept : + status_(o.status_), + table_(o.table_), + temp_(o.temp_), + seed_(o.seed_), + max_values_per_key_(o.max_values_per_key_), + num_keys_(o.num_keys_), + num_occupied_(o.num_occupied_), + is_copy_(true), + is_initialized_(o.is_initialized_) + {} + + /*! \brief move-constructor + * \param[in] object to be moved + */ + HOSTQUALIFIER INLINEQUALIFIER + MultiBucketHashTable(MultiBucketHashTable&& o) noexcept : + status_(std::move(o.status_)), + table_(std::move(o.table_)), + temp_(std::move(o.temp_)), + seed_(std::move(o.seed_)), + max_values_per_key_(std::move(o.max_values_per_key_)), + num_keys_(std::move(o.num_keys_)), + num_occupied_(std::move(o.num_occupied_)), + is_copy_(std::move(o.is_copy_)), + is_initialized_(std::move(o.is_initialized_)) + { + o.is_copy_ = true; + } + + #ifndef __CUDA_ARCH__ + /*! \brief destructor + */ + HOSTQUALIFIER INLINEQUALIFIER + ~MultiBucketHashTable() noexcept + { + if(!is_copy_) + { + if(status_ != nullptr) cudaFree(status_); + if(num_keys_ != nullptr) cudaFree(num_keys_); + if(num_occupied_ != nullptr) cudaFree(num_occupied_); + } + } + #endif + + /*! \brief (re)initialize the hash table + * \param[in] stream CUDA stream in which this operation is executed in + */ + HOSTQUALIFIER INLINEQUALIFIER + void init(const cudaStream_t stream = 0) noexcept + { + is_initialized_ = false; + + if(!table_.status().has_not_initialized() && + !temp_.status().has_not_initialized()) + { + table_.init_keys(empty_key(), stream); + table_.init_values(bucket_type(empty_value()), stream); + + assign_status(table_.status() + temp_.status(), stream); + + cudaMemsetAsync(num_keys_, 0, sizeof(index_type), stream); + cudaMemsetAsync(num_occupied_, 0, sizeof(index_type), stream); + + is_initialized_ = true; + } + } + +private: + DEVICEQUALIFIER INLINEQUALIFIER + bool insert_into_bucket( + const index_type last_key_pos, + const value_type value_in, + const cg::thread_block_tile& group, + index_type num_values, + status_type& status) noexcept + { + #pragma unroll + for(index_type i = 0; + i < SDIV(bucket_size(),cg_size())*cg_size(); + i += cg_size()) + { + // first bucket value always written after key insert + const value_type table_value = + ((0 < group.thread_rank()) && (i + group.thread_rank() < bucket_size())) ? + table_[last_key_pos].value[group.thread_rank()] : + ~empty_value(); + + auto empty_value_mask = group.ballot(is_empty_value(table_value)); + + num_values += min(bucket_size(),cg_size()) - __popc(empty_value_mask); + + if(num_values >= max_values_per_key_) + { + status = status_type::duplicate_key() + + status_type::max_values_for_key_reached(); + device_join_status(status); + return true; + } + + bool success = false; + + while(empty_value_mask) + { + const auto leader = ffs(empty_value_mask) - 1; + + if(group.thread_rank() == leader) + { + const auto old = + atomicCAS(&(table_[last_key_pos].value[i+group.thread_rank()]), table_value, value_in); + + success = (old == table_value); + } + + if(group.any(success)) + { + status = (num_values > 0) ? + status_type::duplicate_key() : status_type::none(); + return true; + } + + ++num_values; + if(num_values >= max_values_per_key_) + { + status = status_type::duplicate_key() + + status_type::max_values_for_key_reached(); + device_join_status(status); + return true; + } + + empty_value_mask ^= 1UL << leader; + } + } + + return false; + } + +public: + /*! \brief inserts a key into the hash table + * \param[in] key_in key to insert into the hash table + * \param[in] value_in value that corresponds to \c key_in + * \param[in] group cooperative group + * \param[in] probing_length maximum number of probing attempts + * \return status (per thread) + */ + DEVICEQUALIFIER INLINEQUALIFIER + status_type insert( + const key_type key_in, + const value_type value_in, + const cg::thread_block_tile& group, + const index_type probing_length = defaults::probing_length()) noexcept + { + if(!is_initialized_) + { + return status_type::not_initialized(); + } + + if(!is_valid_key(key_in)) + { + device_join_status(status_type::invalid_key()); + return status_type::invalid_key(); + } + + if(!is_valid_value(value_in)) + { + device_join_status(status_type::invalid_value()); + return status_type::invalid_value(); + } + + ProbingScheme iter(capacity(), probing_length, group); + index_type num_values_plus_bucket_size = 0; // count one bucket less + + index_type last_key_pos = std::numeric_limits::max(); + for(index_type i = iter.begin(key_in, seed_); i != iter.end(); i = iter.next()) + { + const key_type table_key = cub::ThreadLoad(&table_[i].key); + + auto empty_key_mask = group.ballot(is_empty_key(table_key)); + + const auto key_found_mask = group.ballot(table_key == key_in); + + const auto new_last_key_pos = group.shfl(i, 31 - __clz(key_found_mask)); + + last_key_pos = key_found_mask ? new_last_key_pos : last_key_pos; + + num_values_plus_bucket_size += bucket_size() * __popc(key_found_mask); + + // early exit + if(num_values_plus_bucket_size >= max_values_per_key_) + { + if(bucket_size() == 1) + { + // num values = num buckets, so no space left + status_type status = status_type::duplicate_key() + + status_type::max_values_for_key_reached(); + device_join_status(status); + return status; + } + else + { + status_type status = status_type::unknown_error(); + // check if space left in last bucket + insert_into_bucket(last_key_pos, value_in, group, + num_values_plus_bucket_size - bucket_size(), status); + return status; + } + } + + while(empty_key_mask) + { + status_type status; + if((bucket_size() > 1) && + (last_key_pos < std::numeric_limits::max()) && + insert_into_bucket(last_key_pos, value_in, group, + num_values_plus_bucket_size - bucket_size(), status)) + return status; + + // insert key + bool success = false; + bool key_collision = false; + + const auto leader = ffs(empty_key_mask) - 1; + + if(group.thread_rank() == leader) + { + const auto old = + atomicCAS(&(table_[i].key), table_key, key_in); + + success = (old == table_key); + key_collision = (old == key_in); + + if(success) + { + // relaxed write to first slot in value array + table_[i].value[0] = value_in; + + helpers::atomicAggInc(num_occupied_); + + if(num_values_plus_bucket_size == 0) + { + helpers::atomicAggInc(num_keys_); + } + } + } + + if(group.any(success)) + { + return (num_values_plus_bucket_size > 0) ? + status_type::duplicate_key() : status_type::none(); + } + + key_collision = group.any(key_collision); + num_values_plus_bucket_size += key_collision*bucket_size(); + + if(bucket_size() == 1) + { + if(num_values_plus_bucket_size >= max_values_per_key_) + { + status_type status = status_type::duplicate_key() + + status_type::max_values_for_key_reached(); + device_join_status(status); + return status; + } + } + else + { + // check position in next iteration + const auto new_last_key_pos = group.shfl(i, leader); + last_key_pos = key_collision ? new_last_key_pos : last_key_pos; + } + + empty_key_mask ^= 1UL << leader; + } + } + + status_type status; + if((bucket_size() > 1) && + (last_key_pos < std::numeric_limits::max()) && + insert_into_bucket(last_key_pos, value_in, group, + num_values_plus_bucket_size - bucket_size(), status)) + return status; + + status = (num_values_plus_bucket_size > 0) ? + status_type::probing_length_exceeded() + status_type::duplicate_key() : + status_type::probing_length_exceeded(); + device_join_status(status); + return status; + } + + /*! \brief insert a set of keys into the hash table + * \tparam StatusHandler handles returned status per key (see \c status_handlers) + * \param[in] keys_in pointer to keys to insert into the hash table + * \param[in] values_in corresponds values to \c keys_in + * \param[in] num_in number of keys to insert + * \param[in] stream CUDA stream in which this operation is executed in + * \param[in] probing_length maximum number of probing attempts + * \param[out] status_out status information per key + */ + template + HOSTQUALIFIER INLINEQUALIFIER + void insert( + const key_type * const keys_in, + const value_type * const values_in, + const index_type num_in, + const cudaStream_t stream = 0, + const index_type probing_length = defaults::probing_length(), + typename StatusHandler::base_type * const status_out = nullptr) noexcept + { + static_assert( + checks::is_status_handler(), + "not a valid status handler type"); + + if(!is_initialized_) return; + + kernels::insert + <<>> + (keys_in, values_in, num_in, *this, probing_length, status_out); + } + + /*! \brief retrieves all values to a corresponding key + * \param[in] key_in key to retrieve from the hash table + * \param[out] values_out values for \c key_in + * \param[out] num_out number of retrieved values + * \param[in] group cooperative group + * \param[in] probing_length maximum number of probing attempts + * \return status (per thread) + */ + DEVICEQUALIFIER INLINEQUALIFIER + status_type retrieve( + const key_type key_in, + value_type * const values_out, + index_type& num_out, + const cg::thread_block_tile& group, + const index_type probing_length = defaults::probing_length()) const noexcept + { + if(values_out == nullptr) + { + const auto status = num_values(key_in, num_out, group, probing_length); + device_join_status(status_type::dry_run()); + return status_type::dry_run() + status; + } + else + { + return for_each([=, *this] DEVICEQUALIFIER + (const key_type /* key */, const value_type& value, const index_type index) + { + values_out[index] = value; + }, + key_in, + num_out, + group, + probing_length); + } + } + + /*! \brief retrieve a set of keys from the hash table + * \note this method has a dry-run mode where it only calculates the needed array sizes in case no memory (aka \c nullptr ) is provided + * \note \c end_offsets_out can be \c begin_offsets_out+1 + * \tparam StatusHandler handles returned status per key (see \c status_handlers) + * \param[in] keys_in pointer to keys to retrieve from the hash table + * \param[in] num_in number of keys to retrieve + * \param[out] begin_offsets_out begin of value range for a corresponding key in \c values_out + * \param[out] end_offsets_out end of value range for a corresponding key in \c values_out + * \param[out] num_out total number of values retrieved by this operation + * \param[in] stream CUDA stream in which this operation is executed in + * \param[in] probing_length maximum number of probing attempts + * \param[out] status_out status information (per key) + */ + template + HOSTQUALIFIER INLINEQUALIFIER + void retrieve( + const key_type * const keys_in, + const index_type num_in, + index_type * const begin_offsets_out, + index_type * const end_offsets_out, + value_type * const values_out, + index_type& num_out, + const cudaStream_t stream = 0, + const index_type probing_length = defaults::probing_length(), + typename StatusHandler::base_type * const status_out = nullptr) const noexcept + { + static_assert( + checks::is_status_handler(), + "not a valid status handler type"); + + if(!is_initialized_) return; + + // cub::DeviceScan::InclusiveSum takes input sizes of type int + if(num_in > std::numeric_limits::max()) + { + join_status(status_type::index_overflow(), stream); + + return; + } + + num_values( + keys_in, + num_in, + num_out, + end_offsets_out, + stream, + probing_length); + + if(values_out != nullptr) + { + index_type temp_bytes = num_out * sizeof(value_type); + + cub::DeviceScan::InclusiveSum( + values_out, + temp_bytes, + end_offsets_out, + end_offsets_out, + num_in, + stream); + + cudaMemsetAsync(begin_offsets_out, 0, sizeof(index_type), stream); + + if(end_offsets_out != begin_offsets_out + 1) + { + cudaMemcpyAsync( + begin_offsets_out + 1, + end_offsets_out, + sizeof(index_type) * (num_in - 1), + D2D, + stream); + } + + kernels::retrieve + <<>> + ( + keys_in, + num_in, + begin_offsets_out, + end_offsets_out, + values_out, + *this, + probing_length, + status_out); + } + else + { + if(status_out != nullptr) + { + helpers::lambda_kernel + <<>> + ([=, *this] DEVICEQUALIFIER + { + const index_type tid = helpers::global_thread_id(); + + if(tid < num_in) + { + StatusHandler::handle(Status::dry_run(), status_out, tid); + } + }); + } + + join_status(status_type::dry_run(), stream); + } + + if(stream == 0) + { + cudaStreamSynchronize(stream); + } + } + + /*! \brief retrieves all elements from the hash table + * \note this method has a dry-run mode where it only calculates the needed array sizes in case no memory (aka \c nullptr ) is provided + * \note this method implements a multi-stage dry-run mode + * \param[out] keys_out pointer to the set of unique keys + * \param[out] num_keys_out number of unique keys + * \param[out] begin_offsets_out begin of value range for a corresponding key in \c values_out + * \param[out] end_offsets_out end of value range for a corresponding key in \c values_out + * \param[out] values_out array which holds all retrieved values + * \param[out] num_values_out total number of values retrieved by this operation + * \param[in] stream CUDA stream in which this operation is executed in + */ + HOSTQUALIFIER INLINEQUALIFIER + void retrieve_all( + key_type * const keys_out, + index_type& num_keys_out, + index_type * const begin_offsets_out, + index_type * const end_offsets_out, + value_type * const values_out, + value_type& num_values_out, + const cudaStream_t stream = 0) const noexcept + { + if(!is_initialized_) return; + + retrieve_all_keys(keys_out, num_keys_out, stream); + + if(keys_out != nullptr) + { + retrieve( + keys_out, + num_keys_out, + begin_offsets_out, + end_offsets_out, + values_out, + num_values_out, + stream); + } + + if(stream == 0) + { + cudaStreamSynchronize(stream); + } + } + + /*! \brief retrieve all unqiue keys + * \info this method has a dry-run mode where it only calculates the needed array sizes in case no memory (aka \c nullptr ) is provided + * \param[out] keys_out retrieved unqiue keys + * \param[out] num_out numof unique keys + * \param[in] stream CUDA stream in which this operation is executed in + */ + HOSTQUALIFIER INLINEQUALIFIER + void retrieve_all_keys( + key_type * const keys_out, + index_type& num_out, + const cudaStream_t stream = 0) const noexcept + { + if(!is_initialized_) return; + + if(keys_out != nullptr) + { + index_type * const tmp = temp_.get(); + cudaMemsetAsync(tmp, 0, sizeof(index_type), stream); + + kernels::for_each_unique_key + <<>> + ([=] DEVICEQUALIFIER (const key_type& key) + { + index_type out = helpers::atomicAggInc(tmp); + keys_out[out] = key; + }, *this); + + cudaMemcpyAsync(&num_out, tmp, sizeof(index_type), D2H, stream); + + if(stream == 0) + { + cudaStreamSynchronize(stream); + } + } + else + { + num_out = num_keys(stream); + join_status(status_type::dry_run(), stream); + } + } + + /*! \brief applies a funtion over all values of a specified key + * \tparam Func type of map i.e. CUDA device lambda + * \param[in] f map to apply + * \param[in] key_in key to consider + * \param[out] num_values_out number of values associated to \c key_in + * \param[in] group cooperative group + * \param[in] probing_length maximum number of probing attempts + * \return status (per thread) + */ + template + DEVICEQUALIFIER INLINEQUALIFIER + status_type for_each( + Func f, + const key_type key_in, + index_type& num_values_out, + const cg::thread_block_tile& group, + const index_type probing_length = defaults::probing_length()) const noexcept + { + if(!is_initialized_) return status_type::not_initialized(); + + if(!is_valid_key(key_in)) + { + num_values_out = 0; + device_join_status(status_type::invalid_key()); + return status_type::invalid_key(); + } + + ProbingScheme iter(capacity(), min(probing_length, capacity()), group); + + index_type num = 0; + for(index_type i = iter.begin(key_in, seed_); i != iter.end(); i = iter.next()) + { + const auto table_key = table_[i].key; + const auto hit = (table_key == key_in); + const auto hit_mask = group.ballot(hit); + + index_type num_empty = 0; + if(hit) + { + const auto j = + num + bucket_size() * __popc(hit_mask & ((1U << group.thread_rank()) - 1)); + + const auto bucket = table_[i].value; + #pragma unroll + for(index_type b = 0; b < bucket_size(); ++b) { + const auto& value = bucket[b]; + // if(value != empty_value() && j+b < max_values_per_key_) + if(value != empty_value()) + f(key_in, value, j+b); + else + ++num_empty; + } + } + + // get num_empty from last bucket in group + // if not hit this return 0 from last thread + num_empty = group.shfl(num_empty, 31 - __clz(hit_mask)); + + num += bucket_size() * __popc(hit_mask) - num_empty; + + if(group.any(is_empty_key(table_key) || num >= max_values_per_key_)) + { + num_values_out = num; + + if(num == 0) + { + device_join_status(status_type::key_not_found()); + return status_type::key_not_found(); + } + else + { + return status_type::none(); + } + } + } + + num_values_out = num; + device_join_status(status_type::probing_length_exceeded()); + return status_type::probing_length_exceeded(); + } + + /*! \brief applies a funtion over all key bucket pairs inside the table + * \tparam Func type of map i.e. CUDA device lambda + * \param[in] f map to apply + * \param[in] stream CUDA stream in which this operation is executed in + * \param[in] size of dynamic shared memory to reserve for this execution + */ + template + HOSTQUALIFIER INLINEQUALIFIER + void for_each_bucket( + Func f, // TODO const? + const cudaStream_t stream = 0, + const index_type smem_bytes = 0) const noexcept + { + if(!is_initialized_) return; + + kernels::for_each + <<>> + (f, *this); + } + + /*! \brief applies a funtion over all key value pairs inside the table + * \tparam Func type of map i.e. CUDA device lambda + * \param[in] f map to apply + * \param[in] stream CUDA stream in which this operation is executed in + * \param[in] size of dynamic shared memory to reserve for this execution + */ + template + HOSTQUALIFIER INLINEQUALIFIER + void for_each_value( + Func f, // TODO const? + const cudaStream_t stream = 0, + const index_type smem_bytes = 0) const noexcept + { + if(!is_initialized_) return; + + auto bucket_f = [=, f=std::move(f)] DEVICEQUALIFIER + (const key_type key, const bucket_type bucket) mutable + { + #pragma unroll + for(index_type b = 0; b < bucket_size(); ++b) { + const auto& value = bucket[b]; + if(value != empty_value()) + f(key, value); + } + }; + + kernels::for_each + <<>> + (bucket_f, *this); + } + + /*! \brief applies a funtion over all key value pairs + * \tparam Func type of map i.e. CUDA device lambda + * \tparam StatusHandler handles returned status per key (see \c status_handlers) + * \param[in] f map to apply + * \param[in] keys_in keys to consider + * \param[in] num_in number of keys + * \param[in] stream CUDA stream in which this operation is executed in + * \param[in] probing_length maximum number of probing attempts + * \param[out] status_out status information (per key) + * \param[in] size of dynamic shared memory to reserve for this execution + */ + template + HOSTQUALIFIER INLINEQUALIFIER + void for_each( + Func f, // TODO const? + const key_type * const keys_in, + const index_type num_in, + const cudaStream_t stream = 0, + const index_type probing_length = defaults::probing_length(), + typename StatusHandler::base_type * const status_out = nullptr, + const index_type smem_bytes = 0) const noexcept + { + static_assert( + checks::is_status_handler(), + "not a valid status handler type"); + + if(!is_initialized_) return; + + kernels::for_each + <<>> + (f, keys_in, num_in, *this, status_out); + } + + /*! \brief number of unique keys inside the table + * \param[in] stream CUDA stream in which this operation is executed in + * \return number of unique keys + */ + HOSTQUALIFIER INLINEQUALIFIER + index_type num_keys(const cudaStream_t stream = 0) const noexcept + { + index_type num = 0; + + cudaMemcpyAsync(&num, num_keys_, sizeof(index_type), D2H, stream); + + cudaStreamSynchronize(stream); + + return num; + } + + /*! \brief number of occupied slots in the hash table + * \param[in] stream CUDA stream in which this operation is executed in + * \return the number of occupied slots + */ + HOSTQUALIFIER INLINEQUALIFIER + index_type num_occupied(const cudaStream_t stream = 0) const noexcept + { + index_type num = 0; + + cudaMemcpyAsync(&num, num_occupied_, sizeof(index_type), D2H, stream); + + cudaStreamSynchronize(stream); + + return num; + } + + /*! \brief total number of values inside the table + * \param[in] key_in key to be probed + * \param[out] num_out number of values associated to \c key_in* + * \param[in] group cooperative group + * \param[in] probing_length maximum number of probing attempts + * \return status (per thread) + */ + DEVICEQUALIFIER INLINEQUALIFIER + status_type num_values( + const key_type key_in, + index_type& num_out, + const cg::thread_block_tile& group, + const index_type probing_length = defaults::probing_length()) const noexcept + { + return for_each([=] DEVICEQUALIFIER ( + const key_type /* key */, + const value_type& /* value */, + const index_type /* index */) {}, + key_in, + num_out, + group, + probing_length); + } + + /*! \brief number of values associated to a set of keys + * \info this function returns only \c num_out if \c num_per_key_out==nullptr + * \tparam StatusHandler handles returned status per key (see \c status_handlers) + * \param[in] keys_in keys to consider + * \param[in] num_in number of keys + * \param[out] num_out total number of values + * \param[out] num_per_key_out number of values per key + * \param[in] stream CUDA stream in which this operation is executed in + * \param[in] probing_length maximum number of probing attempts + * \param[out] status_out status information (per key) + */ + template + HOSTQUALIFIER INLINEQUALIFIER + void num_values( + const key_type * const keys_in, + const index_type num_in, + index_type& num_out, + index_type * const num_per_key_out = nullptr, + const cudaStream_t stream = 0, + const index_type probing_length = defaults::probing_length(), + typename StatusHandler::base_type * const status_out = nullptr) const noexcept + { + if(!is_initialized_) return; + + // TODO check if shared memory is benefitial + + index_type * const tmp = temp_.get(); + cudaMemsetAsync(tmp, 0, sizeof(index_type), stream); + + kernels::num_values + <<>> + (keys_in, num_in, tmp, num_per_key_out, *this, probing_length, status_out); + + cudaMemcpyAsync(&num_out, tmp, sizeof(index_type), D2H, stream); + + if(stream == 0) + { + cudaStreamSynchronize(stream); + } + } + + /*! \brief number of values stored inside the hash table + * \info alias for \c size() + * \param[in] stream CUDA stream in which this operation is executed in + * \return the number of values + */ + HOSTQUALIFIER INLINEQUALIFIER + index_type num_values(const cudaStream_t stream = 0) const noexcept + { + return size(stream); + } + + /*! \brief number of values stored inside the hash table + * \param[in] stream CUDA stream in which this operation is executed in + * \return the number of values + */ + HOSTQUALIFIER INLINEQUALIFIER + index_type size(const cudaStream_t stream = 0) const noexcept + { + if(!is_initialized_) return 0; + + index_type out; + index_type * tmp = temp_.get(); + + cudaMemsetAsync(tmp, 0, sizeof(index_t), stream); + + kernels::num_values + <<>> + (tmp, *this); + + cudaMemcpyAsync( + &out, + tmp, + sizeof(index_type), + D2H, + stream); + + cudaStreamSynchronize(stream); + + return out; + } + + /*! \brief current load factor of the hash table + * \param[in] stream CUDA stream in which this operation is executed in + * \return load factor + */ + HOSTQUALIFIER INLINEQUALIFIER + float key_load_factor(const cudaStream_t stream = 0) const noexcept + { + return float(num_occupied(stream)) / float(capacity()); + } + + /*! \brief current load factor of the hash table + * \param[in] stream CUDA stream in which this operation is executed in + * \return load factor + */ + HOSTQUALIFIER INLINEQUALIFIER + float value_load_factor(const cudaStream_t stream = 0) const noexcept + { + return float(num_values(stream)) / float(capacity()*bucket_size()); + } + + /*! \brief current storage density of the hash table + * \param[in] stream CUDA stream in which this operation is executed in + * \return storage density + */ + HOSTQUALIFIER INLINEQUALIFIER + float storage_density(const cudaStream_t stream = 0) const noexcept + { + const index_type key_bytes = num_keys(stream) * sizeof(key_type); + const index_type value_bytes = num_values(stream) * sizeof(value_type); + const index_type table_bytes = bytes_total(); + + return float(key_bytes + value_bytes) / float(table_bytes); + } + + /*! \brief current relative storage density of the hash table + * \param stream CUDA stream in which this operation is executed in + * \return storage density + */ + HOSTQUALIFIER INLINEQUALIFIER + float relative_storage_density(const cudaStream_t stream = 0) const noexcept + { + const index_type key_bytes = num_keys(stream) * sizeof(key_type); + const index_type value_bytes = num_values(stream) * sizeof(value_type); + const index_type occupied_bytes = + num_occupied(stream) * sizeof(key_type) + value_bytes; + + return float(key_bytes + value_bytes) / (occupied_bytes); + } + + /*! \brief get the key capacity of the hash table + * \return number of key slots in the hash table + */ + HOSTDEVICEQUALIFIER INLINEQUALIFIER + index_type capacity() const noexcept + { + return table_.capacity(); + } + + /*! \brief get the maximum value capacity of the hash table + * \return maximum value capacity + */ + HOSTDEVICEQUALIFIER INLINEQUALIFIER + index_type value_capacity() const noexcept + { + return table_.capacity() * bucket_size(); + } + + /*! \brief get the total number of bytes occupied by this data structure + * \return bytes + */ + HOSTQUALIFIER INLINEQUALIFIER + index_type bytes_total() const noexcept + { + return table_.bytes_total() + sizeof(index_type); + } + + /*! \brief indicates if the hash table is properly initialized + * \return \c true iff the hash table is properly initialized + */ + HOSTDEVICEQUALIFIER INLINEQUALIFIER + bool is_initialized() const noexcept + { + return is_initialized_; + } + + /*! \brief get the status of the hash table + * \param[in] stream CUDA stream in which this operation is executed in + * \return the status + */ + HOSTQUALIFIER INLINEQUALIFIER + status_type peek_status(const cudaStream_t stream = 0) const noexcept + { + status_type status = status_type::not_initialized(); + + if(status_ != nullptr) + { + cudaMemcpyAsync( + &status, + status_, + sizeof(status_type), + D2H, + stream); + + cudaStreamSynchronize(stream); + } + + return status; + } + + /*! \brief get and reset the status of the hash table + * \param[in] stream CUDA stream in which this operation is executed in + * \return the status + */ + HOSTQUALIFIER INLINEQUALIFIER + status_type pop_status(const cudaStream_t stream = 0) noexcept + { + status_type status = status_type::not_initialized(); + + if(status_ != nullptr) + { + cudaMemcpyAsync( + &status, + status_, + sizeof(status_type), + D2H, + stream); + + assign_status(table_.status(), stream); + } + + return status; + } + + /*! \brief checks if \c key is equal to \c EmptyKey + * \return \c bool + */ + HOSTDEVICEQUALIFIER INLINEQUALIFIER + static constexpr bool is_empty_key(const key_type key) noexcept + { + return (key == empty_key()); + } + + /*! \brief checks if \c key is equal to \c TombstoneKey + * \return \c bool + */ + HOSTDEVICEQUALIFIER INLINEQUALIFIER + static constexpr bool is_tombstone_key(const key_type key) noexcept + { + return (key == tombstone_key()); + } + + /*! \brief checks if \c key is not equal to \c (EmptyKey||TombstoneKey) + * \return \c bool + */ + HOSTDEVICEQUALIFIER INLINEQUALIFIER + static constexpr bool is_valid_key(const key_type key) noexcept + { + return (key != empty_key() && key != tombstone_key()); + } + + /*! \brief checks if \c value is equal to \c EmptyValue + * \return \c bool + */ + HOSTDEVICEQUALIFIER INLINEQUALIFIER + static constexpr bool is_empty_value(const value_type value) noexcept + { + return (value == empty_value()); + } + + /*! \brief checks if \c value is equal not to \c EmptyValue + * \return \c bool + */ + HOSTDEVICEQUALIFIER INLINEQUALIFIER + static constexpr bool is_valid_value(const value_type value) noexcept + { + return (value != empty_value()); + } + + /*! \brief indicates if this object is a shallow copy + * \return \c bool + */ + HOSTDEVICEQUALIFIER INLINEQUALIFIER + bool is_copy() const noexcept + { + return is_copy_; + } + +private: + /*! \brief assigns the hash table's status + * \info \c const on purpose + * \param[in] status new status + * \param[in] stream CUDA stream in which this operation is executed in + */ + HOSTQUALIFIER INLINEQUALIFIER + void assign_status( + const status_type status, + const cudaStream_t stream = 0) const noexcept + { + if(status_ != nullptr) + { + cudaMemcpyAsync( + status_, + &status, + sizeof(status_type), + H2D, + stream); + + cudaStreamSynchronize(stream); + } + } + + /*! \brief joins additional flags to the hash table's status + * \info \c const on purpose + * \param[in] status new status + * \param[in] stream CUDA stream in which this operation is executed in + */ + HOSTQUALIFIER INLINEQUALIFIER + void join_status( + const status_type status, + const cudaStream_t stream = 0) const noexcept + { + if(status_ != nullptr) + { + status_type peeked = peek_status(stream); + const status_type joined = peeked + status; + + if(joined != peeked) + { + assign_status(joined, stream); + } + } + } + + /*! \brief joins additional flags to the hash table's status + * \info \c const on purpose + * \param[in] status new status + */ + DEVICEQUALIFIER INLINEQUALIFIER + void device_join_status(const status_type status) const noexcept + { + if(status_ != nullptr) + { + status_->atomic_join(status); + } + } + + status_type * status_; //< pointer to status + TableStorage table_; //< actual key/value storage + temp_type temp_; //< temporary memory + key_type seed_; //< random seed + index_type max_values_per_key_; //< maximum number of values to store per key + index_type * num_keys_; //< pointer to the count of unique keys + index_type * num_occupied_; //< pointer to the count of occupied key slots + bool is_copy_; //< indicates if table is a shallow copy + bool is_initialized_; //< indicates if table is properly initialized + + template + GLOBALQUALIFIER + friend void kernels::size(index_type * const, const Core); + + template + GLOBALQUALIFIER + friend void kernels::num_values(index_type * const, const Core); + + template + GLOBALQUALIFIER + friend void kernels::for_each(Func, const Core); + + template + GLOBALQUALIFIER + friend void kernels::for_each_unique_key(Func, const Core); + + template + GLOBALQUALIFIER + friend void kernels::retrieve( + const typename Core::key_type * const, + const index_type, + const index_type * const, + const index_type * const, + typename Core::value_type * const, + const Core, + const index_type, + typename StatusHandler::base_type * const); + +}; // class MultiBucketHashTable + +} // namespace warpcore + +#endif /* WARPCORE_MULTI_VALUE_HASH_TABLE_CUH */ \ No newline at end of file diff --git a/include/multi_value_hash_table.cuh b/include/multi_value_hash_table.cuh index 2c6b938..774d030 100755 --- a/include/multi_value_hash_table.cuh +++ b/include/multi_value_hash_table.cuh @@ -66,11 +66,7 @@ public: using value_type = Value; using index_type = index_t; using status_type = Status; - using key_set_type = HashSet< - Key, - EmptyKey, - TombstoneKey, - defaults::probing_scheme_t>; + using probing_scheme_type = ProbingScheme; /*! \brief get empty key * \return empty key @@ -203,13 +199,13 @@ public: * \param[in] probing_length maximum number of probing attempts * \return status (per thread) */ - DEVICEQUALIFIER INLINEQUALIFIER - status_type insert( - const key_type key_in, - const value_type& value_in, - const cg::thread_block_tile& group, - const index_type probing_length = defaults::probing_length()) noexcept - { + DEVICEQUALIFIER INLINEQUALIFIER + status_type insert( + const key_type key_in, + const value_type& value_in, + const cg::thread_block_tile& group, + const index_type probing_length = defaults::probing_length()) noexcept + { if(!is_initialized_) { return status_type::not_initialized(); @@ -230,7 +226,7 @@ public: auto empty_mask = group.ballot(is_empty_key(table_key)); - num_values += __popc(group.ballot((table_key == key_in))); + num_values += __popc(group.ballot(table_key == key_in)); if(num_values >= max_values_per_key_) { @@ -292,7 +288,7 @@ public: status_type::probing_length_exceeded(); device_join_status(status); return status; - } + } /*! \brief insert a set of keys into the hash table * \tparam StatusHandler handles returned status per key (see \c status_handlers) @@ -572,22 +568,31 @@ public: { if(!is_initialized_) return; - const auto key_set = get_key_set(stream); - if(keys_out != nullptr) { - key_set.retrieve_all(keys_out, num_out, stream); + index_type * const tmp = temp_.get(); + cudaMemsetAsync(tmp, 0, sizeof(index_type), stream); + + kernels::for_each_unique_key + <<>> + ([=] DEVICEQUALIFIER (const key_type& key) + { + index_type out = helpers::atomicAggInc(tmp); + keys_out[out] = key; + }, *this); + + cudaMemcpyAsync(&num_out, tmp, sizeof(index_type), D2H, stream); + + if(stream == 0) + { + cudaStreamSynchronize(stream); + } } else { - num_out = key_set.size(stream); + num_out = num_keys(stream); join_status(status_type::dry_run(), stream); } - - if(stream == 0) - { - cudaStreamSynchronize(stream); - } } /*! \brief applies a funtion over all values of a specified key @@ -629,7 +634,7 @@ public: if(hit) { const auto j = - num + __popc(hit_mask & ~((2UL << group.thread_rank()) - 1)); + num + __popc(hit_mask & ((1UL << group.thread_rank()) - 1)); f(key_in, table_[i].value, j); } @@ -710,30 +715,6 @@ public: (f, keys_in, num_in, *this, status_out); } - /*! \brief \c warpcore::HashSet of unique keys inside the table - * \param[in] stream CUDA stream in which this operation is executed in - * \param[in] size_fraction capacity of the hash set in relation to the number of unique keys inside the table - * \return \c warpcore::HashSet - */ - HOSTQUALIFIER INLINEQUALIFIER - const key_set_type get_key_set( - const cudaStream_t stream = 0, - const float size_fraction = 0.9) const noexcept - { - const index_type set_capacity = num_keys(stream) / size_fraction; - key_set_type hash_set(set_capacity, seed_); - - for_each([=] DEVICEQUALIFIER - (const key_type key, const value_type& /* value */) mutable - { - hash_set.insert(key, cg::tiled_partition<1>(cg::this_thread_block())); - }, stream); - - cudaStreamSynchronize(stream); - - return hash_set; - } - /*! \brief number of unique keys inside the table * \param[in] stream CUDA stream in which this operation is executed in * \return number of unique keys @@ -1057,6 +1038,10 @@ private: GLOBALQUALIFIER friend void kernels::for_each(Func, const Core); + template + GLOBALQUALIFIER + friend void kernels::for_each_unique_key(Func, const Core); + template GLOBALQUALIFIER friend void kernels::retrieve( diff --git a/include/single_value_hash_table.cuh b/include/single_value_hash_table.cuh index a7dfab1..99957c0 100644 --- a/include/single_value_hash_table.cuh +++ b/include/single_value_hash_table.cuh @@ -209,13 +209,13 @@ public: * \param[in] probing_length maximum number of probing attempts * \return status (per thread) */ - DEVICEQUALIFIER INLINEQUALIFIER - status_type insert( - const key_type key_in, - const value_type& value_in, - const cg::thread_block_tile& group, - const index_type probing_length = defaults::probing_length()) noexcept - { + DEVICEQUALIFIER INLINEQUALIFIER + status_type insert( + const key_type key_in, + const value_type& value_in, + const cg::thread_block_tile& group, + const index_type probing_length = defaults::probing_length()) noexcept + { status_type status = status_type::unknown_error(); value_type * value_ptr = @@ -227,7 +227,7 @@ public: } return status; - } + } /*! \brief insert a set of keys into the hash table * \tparam StatusHandler handles returned status per key (see \c status_handlers) diff --git a/include/status.cuh b/include/status.cuh index 385d1a9..9818199 100644 --- a/include/status.cuh +++ b/include/status.cuh @@ -64,6 +64,8 @@ public: static constexpr Status invalid_phase_overlap() noexcept { return Status(one << 10); } HOSTDEVICEQUALIFIER INLINEQUALIFIER static constexpr Status max_values_for_key_reached() noexcept { return Status(one << 11); } + HOSTDEVICEQUALIFIER INLINEQUALIFIER + static constexpr Status invalid_value() noexcept { return Status(one << 12); } HOSTDEVICEQUALIFIER INLINEQUALIFIER static constexpr Status error_mask() noexcept @@ -121,6 +123,8 @@ public: constexpr bool has_invalid_phase_overlap() const noexcept { return status_ & invalid_phase_overlap().status_; } HOSTDEVICEQUALIFIER INLINEQUALIFIER constexpr bool has_max_values_for_key_reached() const noexcept { return status_ & max_values_for_key_reached().status_; } + HOSTDEVICEQUALIFIER INLINEQUALIFIER + constexpr bool has_invalid_value() const noexcept { return status_ & invalid_value().status_; } HOSTDEVICEQUALIFIER INLINEQUALIFIER constexpr Status& operator=(const Status& a) noexcept @@ -238,6 +242,8 @@ OStream& operator<<(OStream& os, Status status) msg.push_back("invalid phase overlap"); if(status.has_max_values_for_key_reached()) msg.push_back("max values for key reached"); + if(status.has_invalid_value()) + msg.push_back("invalid value"); // if(!status.has_any()) // msg.push_back("none"); diff --git a/include/storage.cuh b/include/storage.cuh index ce5b431..839b1d6 100644 --- a/include/storage.cuh +++ b/include/storage.cuh @@ -1375,26 +1375,26 @@ public: * \param[in] stream CUDA stream in which this operation is executed in * \return load factor */ - HOSTDEVICEQUALIFIER INLINEQUALIFIER - float load_factor(const cudaStream_t stream = 0) const noexcept - { - index_type load = 0; + HOSTDEVICEQUALIFIER INLINEQUALIFIER + float load_factor(const cudaStream_t stream = 0) const noexcept + { + index_type load = 0; - cudaMemcpyAsync( - &load, next_free_bucket_, sizeof(index_type), D2H, stream); + cudaMemcpyAsync( + &load, next_free_bucket_, sizeof(index_type), D2H, stream); - cudaStreamSynchronize(stream); + cudaStreamSynchronize(stream); - return float(load) / float(capacity()); - } + return float(load) / float(capacity()); + } /*! \brief get the number of occupied bytes * \param[in] stream CUDA stream in which this operation is executed in * \return bytes */ - HOSTDEVICEQUALIFIER INLINEQUALIFIER - index_type bytes_occupied(const cudaStream_t stream = 0) const noexcept - { + HOSTDEVICEQUALIFIER INLINEQUALIFIER + index_type bytes_occupied(const cudaStream_t stream = 0) const noexcept + { index_type occupied = 0; cudaMemcpyAsync( @@ -1403,7 +1403,7 @@ public: cudaStreamSynchronize(stream); return occupied * sizeof(bucket_type); - } + } /*! \brief get bucket growth factor * \return factor diff --git a/include/warpcore.cuh b/include/warpcore.cuh index 4d17ae8..4abdecc 100644 --- a/include/warpcore.cuh +++ b/include/warpcore.cuh @@ -7,6 +7,7 @@ #include "hash_set.cuh" #include "bloom_filter.cuh" #include "multi_value_hash_table.cuh" +#include "multi_bucket_hash_table.cuh" #include "bucket_list_hash_table.cuh" #endif /* WARPCORE_CUH */