Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCT/IB/EFA/SRD: Initial add of endpoint #10447

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
4 changes: 3 additions & 1 deletion src/uct/ib/efa/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ libuct_ib_efa_la_LDFLAGS = $(EFA_LIB) $(IBVERBS_LDFLAGS) \
-version-info $(SOVERSION)

libuct_ib_efa_la_SOURCES = base/ib_efa_md.c \
srd/srd_iface.c
srd/srd_iface.c \
srd/srd_ep.c

noinst_HEADERS = base/ib_efa.h \
srd/srd_iface.h \
srd/srd_ep.h \
srd/srd_def.h

PKG_CONFIG_NAME=ib-efa
Expand Down
9 changes: 9 additions & 0 deletions src/uct/ib/efa/srd/srd_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
#include <ucs/datastruct/frag_list.h>


typedef ucs_frag_list_sn_t uct_srd_psn_t;


typedef struct uct_srd_neth {
uint32_t packet_type;
} UCS_S_PACKED uct_srd_neth_t;
Expand All @@ -26,4 +29,10 @@ typedef struct uct_srd_ep_addr {
uct_ib_uint24_t ep_id;
} uct_srd_ep_addr_t;


typedef struct uct_srd_ep_peer_address {
struct ibv_ah *ah;
uint32_t dest_qpn;
} uct_srd_ep_peer_address_t;

#endif
155 changes: 155 additions & 0 deletions src/uct/ib/efa/srd/srd_ep.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/**
* Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2025. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#ifdef HAVE_CONFIG_H
# include "config.h"
#endif

#include "srd_ep.h"
#include "srd_iface.h"

#include <ucs/arch/atomic.h>


static void uct_srd_ep_set_dest_ep_id(uct_srd_ep_t *ep, uint32_t dest_id)
{
ucs_assert(dest_id != UCT_SRD_EP_NULL_ID);
ep->dest_ep_id = dest_id;
ep->flags |= UCT_SRD_EP_FLAG_CONNECTED;
}

static ucs_status_t
uct_srd_ep_connect_to_iface(uct_srd_ep_t *ep, const uct_ib_address_t *ib_addr,
const uct_srd_iface_addr_t *if_addr)
{
uct_srd_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_srd_iface_t);
uct_ib_device_t UCS_V_UNUSED *dev = uct_ib_iface_device(&iface->super);
char buf[128];

ucs_debug(UCT_IB_IFACE_FMT " lid %d qpn 0x%x epid %u ep %p connected to "
"IFACE %s qpn 0x%x",
UCT_IB_IFACE_ARG(&iface->super),
dev->port_attr[iface->super.config.port_num - dev->first_port].lid,
iface->qp->qp_num, ep->ep_id, ep,
uct_ib_address_str(ib_addr, buf, sizeof(buf)),
uct_ib_unpack_uint24(if_addr->qp_num));

return UCS_OK;
}

/* FIXME: Replace by CEP connection management */
static uint64_t uct_srd_ep_conn_sn;

static UCS_CLASS_INIT_FUNC(uct_srd_ep_t, const uct_ep_params_t *params)
{
uct_srd_iface_t *iface = ucs_derived_of(params->iface,
uct_srd_iface_t);
const uct_ib_address_t *ib_addr = (const uct_ib_address_t*)
params->dev_addr;
const uct_srd_iface_addr_t *if_addr = (const uct_srd_iface_addr_t*)
params->iface_addr;
ucs_status_t status;

memset(self, 0, sizeof(*self));

UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super.super);

self->dest_ep_id = UCT_SRD_EP_NULL_ID;
self->path_index = UCT_EP_PARAMS_GET_PATH_INDEX(params);
self->tx.psn = UCT_SRD_INITIAL_PSN;
self->rx_creq_count = 0;
ucs_frag_list_init(self->tx.psn - 1, &self->rx.ooo_pkts,
-1 UCS_STATS_ARG(self->super.stats));

uct_srd_iface_add_ep(iface, self);
ucs_debug("created ep ep=%p iface=%p ep_id=%d", self, iface, self->ep_id);

if (!ucs_test_all_flags(params->field_mask,
UCT_EP_PARAM_FIELD_DEV_ADDR |
UCT_EP_PARAM_FIELD_IFACE_ADDR)) {
return UCS_OK;
}

self->conn_sn = ucs_atomic_fadd64(&uct_srd_ep_conn_sn, 1);

/* Connect it to the interface */
status = uct_srd_ep_connect_to_iface(self, ib_addr, if_addr);
if (status != UCS_OK) {
goto err_ep_destroy;
}

/* Generate peer address */
status = uct_srd_iface_unpack_peer_address(iface, ib_addr, if_addr,
self->path_index,
&self->peer_address);
if (status != UCS_OK) {
goto err_ep_destroy;
}

return UCS_OK;

err_ep_destroy:
UCS_CLASS_DELETE_FUNC_NAME(uct_srd_ep_t)(&self->super.super);
return status;
}


static UCS_CLASS_CLEANUP_FUNC(uct_srd_ep_t)
{
uct_srd_iface_t *iface = ucs_derived_of(self->super.super.iface,
uct_srd_iface_t);

uct_srd_iface_remove_ep(iface, self);
yosefe marked this conversation as resolved.
Show resolved Hide resolved
ucs_frag_list_cleanup(&self->rx.ooo_pkts);
}

UCS_CLASS_DEFINE(uct_srd_ep_t, uct_base_ep_t);
UCS_CLASS_DEFINE_NEW_FUNC(uct_srd_ep_t, uct_ep_t, const uct_ep_params_t*);
UCS_CLASS_DEFINE_DELETE_FUNC(uct_srd_ep_t, uct_ep_t);

ucs_status_t
uct_srd_ep_connect_to_ep_v2(uct_ep_h tl_ep, const uct_device_addr_t *dev_addr,
const uct_ep_addr_t *uct_ep_addr,
const uct_ep_connect_to_ep_params_t *params)
{
uct_srd_ep_t *ep = ucs_derived_of(tl_ep, uct_srd_ep_t);
uct_srd_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_srd_iface_t);
const uct_ib_address_t *ib_addr = (const uct_ib_address_t*)dev_addr;
const uct_srd_ep_addr_t *ep_addr = (const uct_srd_ep_addr_t*)uct_ep_addr;
uct_ib_device_t UCS_V_UNUSED *dev = uct_ib_iface_device(&iface->super);
char buf[128];

ucs_assert_always(ep->dest_ep_id == UCT_SRD_EP_NULL_ID);
ucs_trace_func("");

uct_srd_ep_set_dest_ep_id(ep, uct_ib_unpack_uint24(ep_addr->ep_id));

ucs_debug(UCT_IB_IFACE_FMT " slid %d qpn 0x%x epid %u connected to %s "
"qpn 0x%x epid %u",
UCT_IB_IFACE_ARG(&iface->super),
dev->port_attr[iface->super.config.port_num - dev->first_port].lid,
iface->qp->qp_num, ep->ep_id,
uct_ib_address_str(ib_addr, buf, sizeof(buf)),
uct_ib_unpack_uint24(ep_addr->iface_addr.qp_num), ep->dest_ep_id);

return uct_srd_iface_unpack_peer_address(iface, ib_addr,
&ep_addr->iface_addr,
ep->path_index, &ep->peer_address);
}

ucs_status_t uct_srd_ep_get_address(uct_ep_h tl_ep, uct_ep_addr_t *addr)
{
uct_srd_ep_t *ep = ucs_derived_of(tl_ep, uct_srd_ep_t);
uct_srd_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_srd_iface_t);
uct_srd_ep_addr_t *ep_addr = (uct_srd_ep_addr_t*)addr;

uct_ib_pack_uint24(ep_addr->iface_addr.qp_num, iface->qp->qp_num);
uct_ib_pack_uint24(ep_addr->ep_id, ep->ep_id);
return UCS_OK;
}
56 changes: 56 additions & 0 deletions src/uct/ib/efa/srd/srd_ep.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2025. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#ifndef UCT_SRD_EP_H
#define UCT_SRD_EP_H

#include "srd_def.h"


#define UCT_SRD_INITIAL_PSN 1
#define UCT_SRD_EP_NULL_ID UCS_MASK(24)

enum {
UCT_SRD_EP_FLAG_CONNECTED = UCS_BIT(0), /* EP is connected to the peer */
UCT_SRD_EP_FLAG_PRIVATE = UCS_BIT(1), /* EP was created as internal */
UCT_SRD_EP_FLAG_HAS_PENDING = UCS_BIT(2), /* EP has some pending requests */
};

typedef uint32_t uct_srd_ep_conn_sn_t;

typedef struct uct_srd_ep {
uct_base_ep_t super;
uct_srd_ep_conn_sn_t conn_sn;

uint32_t ep_id;
uint32_t dest_ep_id;
uint8_t path_index;

/* connection sequence number. assigned in connect_to_iface() */
uint8_t rx_creq_count;
uint16_t flags;
uct_srd_ep_peer_address_t peer_address;

struct {
ucs_frag_list_t ooo_pkts; /* Out of order packets that
can not be processed yet */
} rx;
struct {
uct_srd_psn_t psn; /* Next PSN to send */
} tx;
} uct_srd_ep_t;


ucs_status_t uct_srd_ep_get_address(uct_ep_h tl_ep, uct_ep_addr_t *addr);
ucs_status_t
uct_srd_ep_connect_to_ep_v2(uct_ep_h tl_ep, const uct_device_addr_t *dev_addr,
const uct_ep_addr_t *uct_ep_addr,
const uct_ep_connect_to_ep_params_t *params);

UCS_CLASS_DECLARE_NEW_FUNC(uct_srd_ep_t, uct_ep_t, const uct_ep_params_t*);
UCS_CLASS_DECLARE_DELETE_FUNC(uct_srd_ep_t, uct_ep_t);

#endif
56 changes: 48 additions & 8 deletions src/uct/ib/efa/srd/srd_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,49 @@

static uct_iface_ops_t uct_srd_iface_tl_ops;

void uct_srd_iface_add_ep(uct_srd_iface_t *iface, uct_srd_ep_t *ep)
{
ep->ep_id = ucs_ptr_array_insert(&iface->eps, ep);
ucs_trace("iface(%p) added ep=%p ep_id=%d", iface, ep, ep->ep_id);
}

void uct_srd_iface_remove_ep(uct_srd_iface_t *iface, uct_srd_ep_t *ep)
{
ucs_assertv(ep->ep_id != UCT_UD_EP_NULL_ID, "iface=%p ep=%p ep_id=%d",
iface, ep, ep->ep_id);

ucs_ptr_array_remove(&iface->eps, ep->ep_id);
ucs_trace("iface(%p) removed ep=%p ep_id=%d", iface, ep, ep->ep_id);
}

ucs_status_t
uct_srd_iface_unpack_peer_address(uct_srd_iface_t *iface,
const uct_ib_address_t *ib_addr,
const uct_srd_iface_addr_t *if_addr,
int path_index, void *address_p)
{
uct_ib_iface_t *ib_iface = &iface->super;
uct_srd_ep_peer_address_t *peer_address = (uct_srd_ep_peer_address_t*)
address_p;
struct ibv_ah_attr ah_attr;
enum ibv_mtu path_mtu;
ucs_status_t status;

memset(peer_address, 0, sizeof(*peer_address));

uct_ib_iface_fill_ah_attr_from_addr(ib_iface, ib_addr, path_index, &ah_attr,
&path_mtu);
status = uct_ib_iface_create_ah(ib_iface, &ah_attr, "SRD connect",
&peer_address->ah);
if (status != UCS_OK) {
return status;
}

peer_address->dest_qpn = uct_ib_unpack_uint24(if_addr->qp_num);

return UCS_OK;
}

ucs_status_t
uct_srd_iface_get_address(uct_iface_h tl_iface, uct_iface_addr_t *iface_addr)
{
Expand All @@ -36,6 +79,7 @@ static uct_ib_iface_ops_t uct_srd_iface_ops = {
ucs_empty_function_return_unsupported,
.ep_invalidate = (uct_ep_invalidate_func_t)
ucs_empty_function_return_unsupported,
.ep_connect_to_ep_v2 = uct_srd_ep_connect_to_ep_v2,
.iface_is_reachable_v2 = uct_ib_iface_is_reachable_v2,
},
.create_cq = uct_ib_verbs_create_cq,
Expand Down Expand Up @@ -335,14 +379,10 @@ static uct_iface_ops_t uct_srd_iface_tl_ops = {
ucs_empty_function_return_unsupported,
.ep_fence = (uct_ep_fence_func_t)
ucs_empty_function_return_unsupported,
.ep_create = (uct_ep_create_func_t)
ucs_empty_function_return_unsupported,
.ep_get_address = (uct_ep_get_address_func_t)
ucs_empty_function_return_unsupported,
.ep_connect_to_ep = (uct_ep_connect_to_ep_func_t)
ucs_empty_function_return_unsupported,
.ep_destroy = (uct_ep_destroy_func_t)
ucs_empty_function_return_unsupported,
.ep_create = UCS_CLASS_NEW_FUNC_NAME(uct_srd_ep_t),
.ep_get_address = uct_srd_ep_get_address,
.ep_connect_to_ep = uct_base_ep_connect_to_ep,
.ep_destroy = UCS_CLASS_DELETE_FUNC_NAME(uct_srd_ep_t),
.ep_am_bcopy = (uct_ep_am_bcopy_func_t)
ucs_empty_function_return_unsupported,
.ep_am_zcopy = (uct_ep_am_zcopy_func_t)
Expand Down
9 changes: 9 additions & 0 deletions src/uct/ib/efa/srd/srd_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define UCT_SRD_IFACE_H

#include "srd_def.h"
#include "srd_ep.h"

#include <uct/ib/ud/base/ud_iface_common.h>

Expand Down Expand Up @@ -56,6 +57,14 @@ typedef struct uct_srd_iface {
} uct_srd_iface_t;


void uct_srd_iface_add_ep(uct_srd_iface_t *iface, uct_srd_ep_t *ep);
void uct_srd_iface_remove_ep(uct_srd_iface_t *iface, uct_srd_ep_t *ep);

ucs_status_t uct_srd_iface_unpack_peer_address(uct_srd_iface_t *iface,
const uct_ib_address_t *ib_addr,
const uct_srd_iface_addr_t *if_addr,
int path_index, void *address_p);

END_C_DECLS

#endif
4 changes: 4 additions & 0 deletions test/gtest/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ if HAVE_MLX5_DV
gtest_SOURCES += \
uct/ib/test_cqe_zipping.cc
endif
if HAVE_EFA
gtest_SOURCES += \
uct/ib/test_srd.cc
endif
endif # HAVE_IB

if HAVE_CUDA
Expand Down
Loading
Loading