Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCT/CUDA_IPC: Use active-queues to track outstanding work #9654

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
UCT/CUDA_IPC: remove max 16 peer limitation
Akshay-Venkatesh committed Feb 1, 2024
commit 1c892ffb387d7dc5309ae86d146ec70e693b67c3
13 changes: 4 additions & 9 deletions src/uct/cuda/cuda_ipc/cuda_ipc_ep.c
Original file line number Diff line number Diff line change
@@ -104,18 +104,13 @@ uct_cuda_ipc_post_cuda_async_copy(uct_ep_h tl_ep, uint64_t remote_addr,
mapped_rem_addr = (void *) ((uintptr_t) mapped_addr + offset);
ucs_assert(offset <= key->b_len);

if (!iface->streams_initialized) {
status = uct_cuda_ipc_iface_init_streams(iface);
if (UCS_OK != status) {
return status;
}
status = uct_cuda_ipc_get_queue_desc(iface, key->dev_num, &q_desc);
if (status != UCS_OK) {
return UCS_ERR_IO_ERROR;
}

key->dev_num %= iface->config.max_streams; /* round-robin */

q_desc = &iface->queue_desc[key->dev_num];
event_q = &q_desc->event_queue;
stream = &iface->queue_desc[key->dev_num].stream;
stream = &q_desc->stream;
cuda_ipc_event = ucs_mpool_get(&iface->event_desc);

if (ucs_unlikely(cuda_ipc_event == NULL)) {
110 changes: 69 additions & 41 deletions src/uct/cuda/cuda_ipc/cuda_ipc_iface.c
Original file line number Diff line number Diff line change
@@ -30,10 +30,6 @@ static ucs_config_field_t uct_cuda_ipc_iface_config_table[] = {
"Max number of event completions to pick during cuda events polling",
ucs_offsetof(uct_cuda_ipc_iface_config_t, max_poll), UCS_CONFIG_TYPE_UINT},

{"MAX_STREAMS", "16",
"Max number of CUDA streams to make concurrent progress on",
ucs_offsetof(uct_cuda_ipc_iface_config_t, max_streams), UCS_CONFIG_TYPE_UINT},

{"CACHE", "y",
"Enable remote endpoint IPC memhandle mapping cache",
ucs_offsetof(uct_cuda_ipc_iface_config_t, enable_cache),
@@ -432,24 +428,6 @@ static void uct_cuda_ipc_event_desc_cleanup(ucs_mpool_t *mp, void *obj)
}
}

ucs_status_t uct_cuda_ipc_iface_init_streams(uct_cuda_ipc_iface_t *iface)
{
ucs_status_t status;
int i;

for (i = 0; i < iface->config.max_streams; i++) {
status = UCT_CUDADRV_FUNC_LOG_ERR(cuStreamCreate(&iface->queue_desc[i].stream,
CU_STREAM_NON_BLOCKING));
if (UCS_OK != status) {
return status;
}
}

iface->streams_initialized = 1;

return UCS_OK;
}

static ucs_status_t
uct_cuda_ipc_estimate_perf(uct_iface_h tl_iface, uct_perf_attr_t *perf_attr)
{
@@ -500,6 +478,65 @@ uct_cuda_ipc_estimate_perf(uct_iface_h tl_iface, uct_perf_attr_t *perf_attr)
return UCS_OK;
}

static ucs_status_t
uct_cuda_ipc_queue_desc_init(uct_cuda_queue_desc_t *q_desc)
{
ucs_queue_head_init(&q_desc->event_queue);
return UCT_CUDADRV_FUNC_LOG_ERR(cuStreamCreate(&q_desc->stream,
CU_STREAM_NON_BLOCKING));
}

static ucs_status_t
uct_cuda_ipc_queue_desc_cleanup(uct_cuda_queue_desc_t *q_desc)
{
return UCT_CUDADRV_FUNC_LOG_ERR(cuStreamDestroy(q_desc->stream));
}

ucs_status_t uct_cuda_ipc_get_queue_desc(uct_cuda_ipc_iface_t *iface, int index,
uct_cuda_queue_desc_t **q_desc_p)
{
uct_cuda_queue_desc_t *q_desc;
ucs_status_t status;
khiter_t iter;
int ret;

iter = kh_put(cuda_ipc_queue_desc, &iface->queue_desc_map,
index, &ret);
if (ret == UCS_KH_PUT_FAILED) {
ucs_error("cannot allocate hash entry");
status = UCS_ERR_NO_MEMORY;
goto out;
}

if (ret == UCS_KH_PUT_KEY_PRESENT) {
q_desc = kh_value(&iface->queue_desc_map, iter);
} else {
q_desc = ucs_malloc(sizeof(*q_desc), "cuda_ipc_queue_desc");
if (q_desc == NULL) {
ucs_error("failed to allocate queue desc");
status = UCS_ERR_NO_MEMORY;
goto err_kh_del;
}

status = uct_cuda_ipc_queue_desc_init(q_desc);
if (status != UCS_OK) {
goto err_free_ctx;
}

kh_value(&iface->queue_desc_map, iter) = q_desc;
}

*q_desc_p = q_desc;
return UCS_OK;

err_free_ctx:
ucs_free(q_desc);
err_kh_del:
kh_del(cuda_ipc_queue_desc, &iface->queue_desc_map, iter);
out:
return status;
}

static ucs_mpool_ops_t uct_cuda_ipc_event_desc_mpool_ops = {
.chunk_alloc = ucs_mpool_chunk_malloc,
.chunk_release = ucs_mpool_chunk_free,
@@ -525,7 +562,6 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_ipc_iface_t, uct_md_h md, uct_worker_h worke
uct_cuda_ipc_iface_config_t *config = NULL;
ucs_status_t status;
ucs_mpool_params_t mp_params;
int i;

config = ucs_derived_of(tl_config, uct_cuda_ipc_iface_config_t);
UCS_CLASS_CALL_SUPER_INIT(uct_cuda_iface_t, &uct_cuda_ipc_iface_ops,
@@ -538,7 +574,6 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_ipc_iface_t, uct_md_h md, uct_worker_h worke
}

self->config.max_poll = config->max_poll;
self->config.max_streams = config->max_streams;
self->config.enable_cache = config->enable_cache;
self->config.enable_get_zcopy = config->enable_get_zcopy;
self->config.max_cuda_ipc_events = config->max_cuda_ipc_events;
@@ -557,36 +592,29 @@ static UCS_CLASS_INIT_FUNC(uct_cuda_ipc_iface_t, uct_md_h md, uct_worker_h worke
return UCS_ERR_IO_ERROR;
}

kh_init_inplace(cuda_ipc_queue_desc, &self->queue_desc_map);
ucs_queue_head_init(&self->active_queue);

for (i = 0; i < UCT_CUDA_IPC_MAX_PEERS; i++) {
ucs_queue_head_init(&self->queue_desc[i].event_queue);
}

self->streams_initialized = 0;
self->cuda_context = 0;
self->cuda_context = 0;

return UCS_OK;
}

static UCS_CLASS_CLEANUP_FUNC(uct_cuda_ipc_iface_t)
{
ucs_status_t status;
int i;
CUcontext cuda_context;
uct_cuda_queue_desc_t *q_desc;

UCT_CUDADRV_FUNC_LOG_ERR(cuCtxGetCurrent(&cuda_context));

if (self->streams_initialized &&
uct_cuda_base_context_match(cuda_context, self->cuda_context)) {
for (i = 0; i < self->config.max_streams; i++) {
status = UCT_CUDADRV_FUNC_LOG_ERR(cuStreamDestroy(self->queue_desc[i].stream));
if (UCS_OK != status) {
continue;
}
kh_foreach_value(&self->queue_desc_map, q_desc, {
if (uct_cuda_base_context_match(cuda_context, self->cuda_context)) {
uct_cuda_ipc_queue_desc_cleanup(q_desc);
}
self->streams_initialized = 0;
}
ucs_free(q_desc);
});

kh_destroy_inplace(cuda_ipc_queue_desc, &self->queue_desc_map);

uct_base_iface_progress_disable(&self->super.super.super,
UCT_PROGRESS_SEND | UCT_PROGRESS_RECV);
10 changes: 5 additions & 5 deletions src/uct/cuda/cuda_ipc/cuda_ipc_iface.h
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@

#include <uct/base/uct_iface.h>
#include <uct/cuda/base/cuda_iface.h>
#include <ucs/datastruct/khash.h>
#include <ucs/arch/cpu.h>
#include <cuda.h>

@@ -16,20 +17,18 @@
#include "cuda_ipc_cache.h"


#define UCT_CUDA_IPC_MAX_PEERS 16
KHASH_MAP_INIT_INT(cuda_ipc_queue_desc, uct_cuda_queue_desc_t*);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
KHASH_MAP_INIT_INT(cuda_ipc_queue_desc, uct_cuda_queue_desc_t*);
KHASH_MAP_INIT_INT(cuda_ipc_queue_desc, uct_cuda_queue_desc_t);

then you do not need to malloc/free hash values

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will consider making this change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in #10538



typedef struct uct_cuda_ipc_iface {
uct_cuda_iface_t super;
ucs_mpool_t event_desc; /* cuda event desc */
int eventfd; /* get event notifications */
int streams_initialized; /* indicates if stream created */
CUcontext cuda_context;
ucs_queue_head_t active_queue;
uct_cuda_queue_desc_t queue_desc[UCT_CUDA_IPC_MAX_PEERS];
khash_t(cuda_ipc_queue_desc) queue_desc_map;
struct {
unsigned max_poll; /* query attempts w.o success */
unsigned max_streams; /* # concurrent streams for || progress*/
unsigned max_cuda_ipc_events; /* max mpool entries */
int enable_cache; /* enable/disable ipc handle cache */
ucs_on_off_auto_value_t enable_get_zcopy; /* enable get_zcopy except for specific platorms */
@@ -41,7 +40,6 @@ typedef struct uct_cuda_ipc_iface {
typedef struct uct_cuda_ipc_iface_config {
uct_iface_config_t super;
unsigned max_poll;
unsigned max_streams;
int enable_cache;
ucs_on_off_auto_value_t enable_get_zcopy;
unsigned max_cuda_ipc_events;
@@ -61,4 +59,6 @@ typedef struct uct_cuda_ipc_event_desc {


ucs_status_t uct_cuda_ipc_iface_init_streams(uct_cuda_ipc_iface_t *iface);
ucs_status_t uct_cuda_ipc_get_queue_desc(uct_cuda_ipc_iface_t *iface, int index,
uct_cuda_queue_desc_t **q_desc_p);
#endif