Skip to content

Commit

Permalink
Add join example
Browse files Browse the repository at this point in the history
  • Loading branch information
maawad committed Jan 25, 2024
1 parent f0313cf commit c114ca2
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 12 deletions.
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ set(example_targets
bcht_example
custom_types_example
iht_example
hash_join
)

foreach(target ${example_targets})
Expand Down
130 changes: 130 additions & 0 deletions examples/hash_join.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@


#include <thrust/device_vector.h>
#include <thrust/for_each.h>
#include <thrust/host_vector.h>
#include <thrust/iterator/counting_iterator.h>
#include <iht.hpp>
#include <limits>
#include <tile_wide_queue.hpp>
#include <type_traits>
#include <vector>

template <typename HashMap>
__global__ void join(HashMap read_from, HashMap query_into, HashMap result_table) {
using pair_type = typename HashMap::value_type;
using key_type = typename HashMap::key_type;
using value_type = typename HashMap::mapped_type;

auto block = cooperative_groups::this_thread_block();
auto tile = cooperative_groups::tiled_partition<HashMap::bucket_size>(block);

using queue_type = bght::tile_wide_queue<pair_type, decltype(tile)>;

auto thread_id = threadIdx.x + blockIdx.x * blockDim.x;

const auto capacity = read_from.max_size();
const auto begin = read_from.begin();

const auto sentinel_pair = read_from.get_sentinel_pair();

const auto pair = thread_id < capacity
? (begin + thread_id)->load(cuda::memory_order_relaxed)
: sentinel_pair;

queue_type work_queue(pair, sentinel_pair, tile);

while (!work_queue.empty()) {
auto cur_pair = work_queue.front();
work_queue.pop();

auto value = query_into.find(cur_pair.first, tile);
// if the key exist
if (value != sentinel_pair.second) {
// store into the result table the sum of the two values
pair_type new_pair{cur_pair.first, cur_pair.second + value};
result_table.insert(new_pair, tile);
}
}
}

template <typename HashMap>
__global__ void print(HashMap result_table) {
using pair_type = typename HashMap::value_type;
using key_type = typename HashMap::key_type;
using value_type = typename HashMap::mapped_type;

const auto capacity = result_table.max_size();
const auto sentinel_key = result_table.get_sentinel_key();
const auto sentinel_pair = result_table.get_sentinel_pair();

auto begin = result_table.begin();

auto thread_id = threadIdx.x + blockIdx.x * blockDim.x;

const auto pair = thread_id < capacity
? (begin + thread_id)->load(cuda::memory_order_relaxed)
: sentinel_pair;
if (pair.first != sentinel_key) {
printf("result_map[%u] = %u, exepcted %u\n",
pair.first,
pair.second,
pair.first * 2 * 10);
};
}
int main(int, char**) {
using key_type = uint32_t;
using value_type = uint32_t;
using pair_type = bght::pair<key_type, value_type>;

std::vector<pair_type> read_from_input_h = {
{1, 10}, {2, 20}, {3, 30}, {4, 40}, {5, 50}};
std::vector<pair_type> query_into_input_h = {
{3, 30}, {4, 40}, {5, 50}, {6, 60}, {7, 70}};

auto invalid_key = std::numeric_limits<key_type>::max();
auto invalid_value = std::numeric_limits<value_type>::max();

const float load_factor = 0.7;

const auto num_read_from_keys = read_from_input_h.size();
const auto num_query_into_keys = query_into_input_h.size();

std::size_t read_from_capacity = double(num_read_from_keys) / load_factor;
std::size_t query_into_capacity = double(num_query_into_keys) / load_factor;

using hash_map = bght::iht<key_type, value_type>;

hash_map read_from_map(read_from_capacity, invalid_key, invalid_value);
hash_map query_into_map(query_into_capacity, invalid_key, invalid_value);

thrust::device_vector<pair_type> read_from_input(read_from_input_h);
thrust::device_vector<pair_type> query_into_input(query_into_input_h);

// do the insertions which can execute concurrently, but here we use the default stream
bool success = read_from_map.insert(read_from_input.data().get(),
read_from_input.data().get() + num_read_from_keys);
if (!success) {
std::cerr << "Failed to build first map" << std::endl;
}
success = query_into_map.insert(query_into_input.data().get(),
query_into_input.data().get() + query_into_capacity);
if (!success) {
std::cerr << "Failed to build second map" << std::endl;
}

// build the result
const auto worst_case_capacity = std::min(query_into_capacity, read_from_capacity);
hash_map result_map(worst_case_capacity, invalid_key, invalid_value);

const uint32_t block_size = 128;
uint32_t num_blocks = (num_read_from_keys + block_size - 1) / block_size;
join<<<num_blocks, block_size>>>(read_from_map, query_into_map, result_map);

cuda_try(cudaDeviceSynchronize());

std::cout << "Join complete" << std::endl;

num_blocks = (worst_case_capacity + block_size - 1) / block_size;
print<<<num_blocks, block_size>>>(result_map);
}
11 changes: 7 additions & 4 deletions include/detail/iht_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ template <class Key,
int B,
int Threshold>
typename iht<Key, T, Hash, KeyEqual, Scope, Allocator, B, Threshold>::const_iterator
iht<Key, T, Hash, KeyEqual, Scope, Allocator, B, Threshold>::begin() const {
__device__ __host__
iht<Key, T, Hash, KeyEqual, Scope, Allocator, B, Threshold>::begin() const {
return d_table_;
}

Expand All @@ -360,7 +361,8 @@ template <class Key,
int B,
int Threshold>
typename iht<Key, T, Hash, KeyEqual, Scope, Allocator, B, Threshold>::const_iterator
iht<Key, T, Hash, KeyEqual, Scope, Allocator, B, Threshold>::end() const {
__device__ __host__
iht<Key, T, Hash, KeyEqual, Scope, Allocator, B, Threshold>::end() const {
return d_table_ + capacity_;
}

Expand All @@ -372,8 +374,9 @@ template <class Key,
typename Allocator,
int B,
int Threshold>
typename iht<Key, T, Hash, KeyEqual, Scope, Allocator, B, Threshold>::size_type
iht<Key, T, Hash, KeyEqual, Scope, Allocator, B, Threshold>::max_size() const {
__device__ __host__
typename iht<Key, T, Hash, KeyEqual, Scope, Allocator, B, Threshold>::size_type
iht<Key, T, Hash, KeyEqual, Scope, Allocator, B, Threshold>::max_size() const {
return capacity_;
}
} // namespace bght
8 changes: 4 additions & 4 deletions include/detail/pair.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ struct alignas(detail::pair_alignment<T1, T2>()) padded_pair {
padded_pair& operator=(padded_pair const&) = default;
padded_pair& operator=(padded_pair&&) = default;

__host__ __device__ inline bool operator==(const padded_pair& rhs) {
__host__ __device__ inline bool operator==(const padded_pair& rhs) const {
return (this->first == rhs.first) && (this->second == rhs.second);
}
__host__ __device__ inline bool operator!=(const padded_pair& rhs) {
__host__ __device__ inline bool operator!=(const padded_pair& rhs) const {
return !(*this == rhs);
}

Expand All @@ -57,10 +57,10 @@ struct alignas(detail::pair_alignment<T1, T2>()) padded_pair<T1, T2, true> {
padded_pair& operator=(padded_pair const&) = default;
padded_pair& operator=(padded_pair&&) = default;

__host__ __device__ inline bool operator==(const padded_pair& rhs) {
__host__ __device__ inline bool operator==(const padded_pair& rhs) const {
return (this->first == rhs.first) && (this->second == rhs.second);
}
__host__ __device__ inline bool operator!=(const padded_pair& rhs) {
__host__ __device__ inline bool operator!=(const padded_pair& rhs) const {
return !(*this == rhs);
}

Expand Down
31 changes: 27 additions & 4 deletions include/iht.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ struct iht {
static constexpr auto bucket_size = B;
using key_equal = KeyEqual;

using iterator = atomic_pair_type&;
using iterator = atomic_pair_type*;
using const_iterator = iterator;

/**
Expand Down Expand Up @@ -194,21 +194,44 @@ struct iht {
*
* @return const_iterator constant iterator to the first element of the table
*/
const_iterator begin() const;
__device__ __host__ const_iterator begin() const;
/**
* @brief Returns an iterator to the last element of the tables including all invalid
* entries.
*
* @return const_iterator constant iterator to the last element of the table
*/
const_iterator end() const;
__device__ __host__ const_iterator end() const;

/**
* @brief Returns the maximum number of elements the container is able to hold
*
* @return size_type maximum number of elements including all invalid entries.
*/
size_type max_size() const;
__device__ __host__ size_type max_size() const;

/**
* @brief Get the sentinel key object
*
* @return key_type Sentinel key
*/
__device__ __host__ key_type get_sentinel_key() const { return sentinel_key_; }

/**
* @brief Get the sentinel value object
*
* @return mapped_type Sentinel value
*/
__device__ __host__ mapped_type get_sentinel_value() const { return sentinel_value_; }

/**
* @brief Get the sentinel pair object
*
* @return value_type Sentinel pair
*/
__device__ __host__ value_type get_sentinel_pair() const {
return {get_sentinel_key(), get_sentinel_value()};
}

private:
template <typename InputIt, typename HashMap>
Expand Down
65 changes: 65 additions & 0 deletions include/tile_wide_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2024 The Regents of the University of California, Davis
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cooperative_groups.h>
namespace bght {
template <typename T, typename CG>
struct tile_wide_queue {
using value_type = T;
using reference = value_type&;
using const_reference = const value_type&;
using size_type = uint32_t;

__device__ tile_wide_queue(const T& element, const T& sentinel, const CG& group)
: element_{element}, cg_{group} {
active_lane_ = (element_ != sentinel);
build();
}
__device__ tile_wide_queue(const T& element, const bool& valid, const CG& group)
: element_{element}, cg_{group} {
active_lane_ = valid;
build();
}

__device__ value_type front() {
auto item = cg_.shfl(element_, cur_);
return item;
}
__device__ void pop() {
if (!empty()) {
if (cg_.thread_rank() == cur_) {
active_lane_ = false;
}
build();
}
}
__device__ [[nodiscard]] bool empty() const { return mask_ == 0; }
__device__ size_type size() const { return __popc(mask_); }

private:
__device__ void build() {
mask_ = cg_.ballot(active_lane_);
cur_ = __ffs(mask_) - 1;
}
const T& element_;
const CG cg_;
bool active_lane_;
uint32_t mask_;
uint32_t cur_;
};
} // namespace bght

0 comments on commit c114ca2

Please sign in to comment.