Skip to content

Commit

Permalink
Implement a special bounce buffer that can overlap read with h2d memcpy
Browse files Browse the repository at this point in the history
  • Loading branch information
kingcrimsontianyu committed Mar 7, 2025
1 parent ab867bd commit 9cfed2d
Show file tree
Hide file tree
Showing 20 changed files with 530 additions and 85 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ set(SOURCES
"src/shim/cufile.cpp"
"src/shim/utils.cpp"
"src/stream.cpp"
"src/threadpool_wrapper.cpp"
"src/utils.cpp"
)

Expand Down
76 changes: 73 additions & 3 deletions cpp/include/kvikio/bounce_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
#pragma once

#include <mutex>
#include <stack>
#include <vector>

#include <kvikio/defaults.hpp>
#include <kvikio/shim/utils.hpp>

namespace kvikio {

Expand All @@ -33,7 +35,7 @@ class AllocRetain {
// Stack of free allocations
std::stack<void*> _free_allocs{};
// The size of each allocation in `_free_allocs`
std::size_t _size{defaults::bounce_buffer_size()};
std::size_t _size{}; // Decouple this class from the defaults singleton.

public:
/**
Expand All @@ -57,7 +59,7 @@ class AllocRetain {
std::size_t size() noexcept;
};

AllocRetain() = default;
AllocRetain();

// Notice, we do not clear the allocations at destruction thus the allocations leaks
// at exit. We do this because `AllocRetain::instance()` stores the allocations in a
Expand Down Expand Up @@ -102,4 +104,72 @@ class AllocRetain {
AllocRetain& operator=(AllocRetain&& o) = delete;
};

class BlockView;

class Block {
private:
std::byte* _buffer{nullptr};
std::size_t _bytes{0u};

public:
Block() = default;
~Block() = default;
Block(Block const&) = delete;
Block& operator=(Block const&) = delete;
Block(Block&&) = default;
Block& operator=(Block&&) = default;

void allocate(std::size_t bytes);
void deallocate();

BlockView make_view(std::size_t start_byte_idx, std::size_t bytes);
std::size_t size() const noexcept;
std::byte* data() const noexcept;
};

class BlockView {
private:
std::byte* _buffer{nullptr};
std::size_t _bytes{0u};

public:
BlockView(std::byte* buffer, std::size_t bytes);
BlockView(BlockView const&) = default;
BlockView& operator=(BlockView const&) = default;
BlockView(BlockView&&) = default;
BlockView& operator=(BlockView&&) = default;

std::size_t size() const noexcept;
std::byte* data() const noexcept;
};

class BounceBuffer {
private:
BounceBuffer() = default;

std::size_t _requested_bytes_per_block{1024u * 1024u * 16u};
std::size_t _num_blocks{4u};

inline static Block block_pool;
std::vector<BlockView> _blockviews_pool;

Block _block;
std::vector<BlockView> _blockviews;

public:
static BounceBuffer& instance();

static void preinitialize_for_pool(unsigned int num_threads,
std::size_t requested_bytes_per_block,
std::size_t num_blocks);
void initialize_per_thread(std::size_t requested_bytes_per_block, std::size_t num_blocks);

BlockView get();

BounceBuffer(BounceBuffer const&) = delete;
BounceBuffer& operator=(BounceBuffer const&) = delete;
BounceBuffer(BounceBuffer&&) = delete;
BounceBuffer& operator=(BounceBuffer&&) = delete;
};

} // namespace kvikio
9 changes: 6 additions & 3 deletions cpp/include/kvikio/defaults.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@ std::vector<int> getenv_or(std::string_view env_var_name, std::vector<int> defau
*/
class defaults {
private:
BS_thread_pool _thread_pool{get_num_threads_from_env()};
std::unique_ptr<BS_thread_pool> _thread_pool;
CompatMode _compat_mode;
std::size_t _task_size;
std::size_t _gds_threshold;
std::size_t _bounce_buffer_size;
std::size_t _num_subtasks_per_task;
std::size_t _http_max_attempts;
long _http_timeout;
std::vector<int> _http_status_codes;

static unsigned int get_num_threads_from_env();
std::function<void()> _worker_thread_init_func;

defaults();

Expand Down Expand Up @@ -236,6 +236,9 @@ class defaults {
*/
static void set_bounce_buffer_size(std::size_t nbytes);

[[nodiscard]] static std::size_t num_subtasks_per_task();
static void set_num_subtasks_per_task(std::size_t num_subtasks_per_task);

/**
* @brief Get the maximum number of attempts per remote IO read.
*
Expand Down
6 changes: 6 additions & 0 deletions cpp/include/kvikio/file_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,12 @@ class FileHandle {
off_t devPtr_offset = 0,
CUstream stream = nullptr);

std::size_t read_async_v2(void* devPtr_base,
std::size_t size,
std::size_t file_offset = 0,
std::size_t devPtr_offset = 0,
CUstream stream = nullptr);

/**
* @brief Writes specified bytes from the device memory into the file asynchronously.
*
Expand Down
5 changes: 5 additions & 0 deletions cpp/include/kvikio/nvtx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ class NvtxManager {
NvtxManager() = default;
};

struct NvtxData {
std::uint64_t nvtx_payload{0ull};
nvtx_color_type nvtx_color{NvtxManager::default_color()};
};

/**
* @brief Convenience macro for generating an NVTX range in the `libkvikio` domain
* from the lifetime of a function.
Expand Down
103 changes: 83 additions & 20 deletions cpp/include/kvikio/parallel_operation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <future>
#include <numeric>
#include <system_error>
#include <type_traits>
#include <utility>
#include <vector>

Expand All @@ -42,26 +43,21 @@ namespace detail {
*
* @return A pair of NVTX color and call index.
*/
inline const std::pair<const nvtx_color_type&, std::uint64_t> get_next_color_and_call_idx() noexcept
inline NvtxData const get_nvtx_data() noexcept
{
static std::atomic_uint64_t call_counter{1ull};
auto call_idx = call_counter.fetch_add(1ull, std::memory_order_relaxed);
auto& nvtx_color = NvtxManager::get_color_by_index(call_idx);
return {nvtx_color, call_idx};
return {call_idx, nvtx_color};
}

template <typename F, typename T>
std::future<std::size_t> submit_task(F op,
T buf,
std::size_t size,
std::size_t file_offset,
std::size_t devPtr_offset,
std::uint64_t nvtx_payload = 0ull,
nvtx_color_type nvtx_color = NvtxManager::default_color())
template <typename F, typename... Args>
std::future<std::size_t> submit_task(F op, NvtxData nvtx_data, Args... args)
{
static_assert(std::is_invocable_r_v<std::size_t, F, Args...>);
return defaults::thread_pool().submit_task([=] {
KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color);
return op(buf, size, file_offset, devPtr_offset);
KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_data.nvtx_payload, nvtx_data.nvtx_color);
return op(args...);
});
}

Expand All @@ -81,19 +77,18 @@ std::future<std::size_t> submit_task(F op,
*/
template <typename F, typename T>
std::future<std::size_t> parallel_io(F op,
NvtxData nvtx_data,
T buf,
std::size_t size,
std::size_t file_offset,
std::size_t task_size,
std::size_t devPtr_offset,
std::uint64_t call_idx = 0,
nvtx_color_type nvtx_color = NvtxManager::default_color())
std::size_t devPtr_offset)
{
KVIKIO_EXPECT(task_size > 0, "`task_size` must be positive", std::invalid_argument);

// Single-task guard
if (task_size >= size || page_size >= size) {
return detail::submit_task(op, buf, size, file_offset, devPtr_offset, call_idx, nvtx_color);
return detail::submit_task(op, nvtx_data, buf, size, file_offset, devPtr_offset);
}

// We know an upper bound of the total number of tasks
Expand All @@ -102,17 +97,85 @@ std::future<std::size_t> parallel_io(F op,

// 1) Submit `task_size` sized tasks
while (size >= task_size) {
tasks.push_back(
detail::submit_task(op, buf, task_size, file_offset, devPtr_offset, call_idx, nvtx_color));
tasks.push_back(detail::submit_task(op, nvtx_data, buf, task_size, file_offset, devPtr_offset));
file_offset += task_size;
devPtr_offset += task_size;
size -= task_size;
}

// 2) Submit a task for the remainder
if (size > 0) {
tasks.push_back(
detail::submit_task(op, buf, size, file_offset, devPtr_offset, call_idx, nvtx_color));
tasks.push_back(detail::submit_task(op, nvtx_data, buf, size, file_offset, devPtr_offset));
}

// Finally, we sum the result of all tasks.
auto gather_tasks = [](std::vector<std::future<std::size_t>>&& tasks) -> std::size_t {
std::size_t ret = 0;
for (auto& task : tasks) {
ret += task.get();
}
return ret;
};
return std::async(std::launch::deferred, gather_tasks, std::move(tasks));
}

template <typename F, typename T>
std::future<std::size_t> parallel_io_for_async_memcpy(F op,
NvtxData nvtx_data,
T buf,
std::size_t size,
std::size_t file_offset,
std::size_t task_size,
std::size_t devPtr_offset)
{
KVIKIO_EXPECT(task_size > 0, "`task_size` must be positive", std::invalid_argument);

// Single-task guard
if (task_size >= size || page_size >= size) {
return detail::submit_task(op, nvtx_data, buf, size, file_offset, devPtr_offset, true);
}

std::vector<std::future<std::size_t>> tasks;
tasks.reserve((size + task_size - 1) / task_size);
auto const num_subtasks_per_task = defaults::num_subtasks_per_task();
auto const subtask_size = (task_size + num_subtasks_per_task - 1) / num_subtasks_per_task;

// 1) Submit `task_size` sized tasks, each composed of subtasks.
while (size >= task_size) {
auto task = [=]() -> std::size_t {
for (std::size_t subtask_idx = 0; subtask_idx < num_subtasks_per_task; ++subtask_idx) {
bool current_sync_stream{false};
auto current_subtask_size{subtask_size};
if (subtask_idx == num_subtasks_per_task - 1) {
current_sync_stream = true;
current_subtask_size = task_size - subtask_size * subtask_idx;
}
op(buf, current_subtask_size, file_offset, devPtr_offset, current_sync_stream);
}
return task_size;
};
tasks.push_back(detail::submit_task(task, nvtx_data));
file_offset += task_size;
devPtr_offset += task_size;
size -= task_size;
}

// 2) Submit subtasks for the remainder.
if (size > 0) {
auto task = [=]() -> std::size_t {
auto const num_subtasks = (size + subtask_size - 1) / subtask_size;
for (std::size_t subtask_idx = 0; subtask_idx < num_subtasks; ++subtask_idx) {
bool current_sync_stream{false};
auto current_subtask_size{subtask_size};
if (subtask_idx == num_subtasks - 1) {
current_sync_stream = true;
current_subtask_size = size - subtask_size * subtask_idx;
}
op(buf, current_subtask_size, file_offset, devPtr_offset, current_sync_stream);
}
return size;
};
tasks.push_back(detail::submit_task(task, nvtx_data));
}

// Finally, we sum the result of all tasks.
Expand Down
24 changes: 12 additions & 12 deletions cpp/include/kvikio/posix_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,31 +133,29 @@ std::size_t posix_device_io(int fd,
void const* devPtr_base,
std::size_t size,
std::size_t file_offset,
std::size_t devPtr_offset)
std::size_t devPtr_offset,
CUstream stream)
{
auto alloc = AllocRetain::instance().get();
auto alloc = BounceBuffer::instance().get();
CUdeviceptr devPtr = convert_void2deviceptr(devPtr_base) + devPtr_offset;
off_t cur_file_offset = convert_size2off(file_offset);
off_t byte_remaining = convert_size2off(size);
off_t const chunk_size2 = convert_size2off(alloc.size());

// Get a stream for the current CUDA context and thread
CUstream stream = StreamsByThread::get();

while (byte_remaining > 0) {
off_t const nbytes_requested = std::min(chunk_size2, byte_remaining);
ssize_t nbytes_got = nbytes_requested;
if constexpr (Operation == IOOperationType::READ) {
nbytes_got = posix_host_io<IOOperationType::READ, PartialIO::YES>(
fd, alloc.get(), nbytes_requested, cur_file_offset);
CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync(devPtr, alloc.get(), nbytes_got, stream));
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
fd, alloc.data(), nbytes_requested, cur_file_offset);
CUDA_DRIVER_TRY(
cudaAPI::instance().MemcpyHtoDAsync(devPtr, alloc.data(), nbytes_got, stream));
} else { // Is a write operation
CUDA_DRIVER_TRY(
cudaAPI::instance().MemcpyDtoHAsync(alloc.get(), devPtr, nbytes_requested, stream));
cudaAPI::instance().MemcpyDtoHAsync(alloc.data(), devPtr, nbytes_requested, stream));
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
posix_host_io<IOOperationType::WRITE, PartialIO::NO>(
fd, alloc.get(), nbytes_requested, cur_file_offset);
fd, alloc.data(), nbytes_requested, cur_file_offset);
}
cur_file_offset += nbytes_got;
devPtr += nbytes_got;
Expand Down Expand Up @@ -227,7 +225,8 @@ std::size_t posix_device_read(int fd,
void const* devPtr_base,
std::size_t size,
std::size_t file_offset,
std::size_t devPtr_offset);
std::size_t devPtr_offset,
CUstream stream);

/**
* @brief Write device memory to disk using POSIX
Expand All @@ -246,6 +245,7 @@ std::size_t posix_device_write(int fd,
void const* devPtr_base,
std::size_t size,
std::size_t file_offset,
std::size_t devPtr_offset);
std::size_t devPtr_offset,
CUstream stream);

} // namespace kvikio::detail
2 changes: 2 additions & 0 deletions cpp/include/kvikio/shim/cuda.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class cudaAPI {
decltype(cuStreamSynchronize)* StreamSynchronize{nullptr};
decltype(cuStreamCreate)* StreamCreate{nullptr};
decltype(cuStreamDestroy)* StreamDestroy{nullptr};
decltype(cuDeviceGetCount)* DeviceGetCount{nullptr};
decltype(cuDevicePrimaryCtxGetState)* DevicePrimaryCtxGetState{nullptr};

private:
cudaAPI();
Expand Down
Loading

0 comments on commit 9cfed2d

Please sign in to comment.