Skip to content

Commit

Permalink
src: Pass NIC index to transport layer functions
Browse files Browse the repository at this point in the history
  • Loading branch information
philipmarshall21 committed May 8, 2024
1 parent 864fbb2 commit f9b106f
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 51 deletions.
12 changes: 6 additions & 6 deletions src/shmem_comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ shmem_internal_get_ct(shmemx_ct_t ct, void *target, const void *source, size_t l
{
/* TODO: add shortcut for on-node-comms */
shmem_transport_get_ct((shmem_transport_ct_t *) ct,
target, source, len, pe);
target, source, len, pe, nic_idx);
}


Expand All @@ -164,7 +164,7 @@ shmem_internal_swap(shmem_ctx_t ctx, void *target, void *source, void *dest, siz
if (shmem_shr_transport_use_atomic(ctx, target, len, pe, datatype)) {
shmem_shr_transport_swap(ctx, target, source, dest, len, pe, datatype);
} else {
shmem_transport_swap((shmem_transport_ctx_t *)ctx, target, source, dest, len, pe, datatype);
shmem_transport_swap((shmem_transport_ctx_t *)ctx, target, source, dest, len, pe, datatype, nic_idx);
}
}

Expand Down Expand Up @@ -197,7 +197,7 @@ shmem_internal_cswap(shmem_ctx_t ctx, void *target, void *source, void *dest, vo
shmem_shr_transport_cswap(ctx, target, source, dest, operand, len, pe, datatype);
} else {
shmem_transport_cswap((shmem_transport_ctx_t *)ctx, target, source,
dest, operand, len, pe, datatype);
dest, operand, len, pe, datatype, nic_idx);
}
}

Expand Down Expand Up @@ -230,7 +230,7 @@ shmem_internal_mswap(shmem_ctx_t ctx, void *target, void *source, void *dest, vo
shmem_shr_transport_mswap(ctx, target, source, dest, mask, len, pe, datatype);
} else {
shmem_transport_mswap((shmem_transport_ctx_t *)ctx, target, source,
dest, mask, len, pe, datatype);
dest, mask, len, pe, datatype, nic_idx);
}
}

Expand Down Expand Up @@ -262,7 +262,7 @@ shmem_internal_atomic_fetch(shmem_ctx_t ctx, void *target, const void *source, s
shmem_shr_transport_atomic_fetch(ctx, target, source, len, pe, datatype);
} else {
shmem_transport_atomic_fetch((shmem_transport_ctx_t *)ctx, target,
source, len, pe, datatype);
source, len, pe, datatype, nic_idx);
}
}

Expand Down Expand Up @@ -314,7 +314,7 @@ shmem_internal_fetch_atomic(shmem_ctx_t ctx, void *target, void *source, void *d
op, datatype);
} else {
shmem_transport_fetch_atomic((shmem_transport_ctx_t *)ctx, target,
source, dest, len, pe, op, datatype);
source, dest, len, pe, op, datatype, nic_idx);
}
}

Expand Down
13 changes: 7 additions & 6 deletions src/transport_none.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ shmem_transport_get_wait(shmem_transport_ctx_t* ctx, size_t idx)
static inline
void
shmem_transport_swap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest,
size_t len, int pe, shm_internal_datatype_t datatype)
size_t len, int pe, shm_internal_datatype_t datatype, size_t nic_idx)
{
RAISE_ERROR_STR("No path to peer");
}
Expand All @@ -183,7 +183,7 @@ static inline
void
shmem_transport_cswap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest,
const void *operand, size_t len, int pe,
shm_internal_datatype_t datatype)
shm_internal_datatype_t datatype, size_t nic_idx)
{
RAISE_ERROR_STR("No path to peer");
}
Expand All @@ -201,7 +201,7 @@ static inline
void
shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest,
const void *mask, size_t len, int pe,
shm_internal_datatype_t datatype)
shm_internal_datatype_t datatype, size_t nic_idx)
{
RAISE_ERROR_STR("No path to peer");
}
Expand All @@ -225,7 +225,7 @@ shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const void *so
static inline
void
shmem_transport_fetch_atomic(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, size_t len,
int pe, shm_internal_op_t op, shm_internal_datatype_t datatype)
int pe, shm_internal_op_t op, shm_internal_datatype_t datatype, size_t nic_idx)
{
RAISE_ERROR_STR("No path to peer");
}
Expand All @@ -241,7 +241,7 @@ shmem_transport_fetch_atomic_nbi(shmem_transport_ctx_t* ctx, void *target, const
static inline
void
shmem_transport_atomic_fetch(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len,
int pe, shm_internal_datatype_t datatype)
int pe, shm_internal_datatype_t datatype, size_t nic_idx)
{
RAISE_ERROR_STR("No path to peer");
}
Expand Down Expand Up @@ -301,7 +301,8 @@ shmem_transport_put_ct_nb(shmem_transport_ct_t *ct, void *target, const void

static inline
void shmem_transport_get_ct(shmem_transport_ct_t *ct, void
*target, const void *source, size_t len, int pe)
*target, const void *source, size_t len, int pe,
size_t nic_idx)
{
RAISE_ERROR_STR("No path to peer");
}
Expand Down
26 changes: 13 additions & 13 deletions src/transport_ofi.c
Original file line number Diff line number Diff line change
Expand Up @@ -1778,21 +1778,21 @@ static int shmem_transport_ofi_ctx_init(shmem_transport_ctx_t *ctx, int id)
ctx->cq = (struct fid_cq **) malloc(shmem_transport_ofi_num_nics * sizeof(struct fid_cq *));
for (size_t idx = 0; idx < shmem_transport_ofi_num_nics; idx++) {
#ifdef USE_CTX_LOCK
ctx->pending_put_cntr[idx] = 0;
ctx->pending_get_cntr[idx] = 0;
ctx->pending_put_cntr[idx] = 0;
ctx->pending_get_cntr[idx] = 0;
#else
shmem_internal_cntr_write(&ctx->pending_put_cntr[idx], 0);
shmem_internal_cntr_write(&ctx->pending_get_cntr[idx], 0);
shmem_internal_cntr_write(&ctx->pending_put_cntr[idx], 0);
shmem_internal_cntr_write(&ctx->pending_get_cntr[idx], 0);
#endif
/* FIX */
//shmem_transport_ofi_eps[idx]->info->ep_attr->tx_ctx_cnt = shmem_transport_ofi_stx_max > 0 ? FI_SHARED_CONTEXT : 0;
//shmem_transport_ofi_eps[idx]->info->caps = FI_RMA | FI_WRITE | FI_READ | FI_ATOMIC | FI_RECV;
//shmem_transport_ofi_eps[idx]->info->tx_attr->op_flags = FI_DELIVERY_COMPLETE;
//shmem_transport_ofi_eps[idx]->info->mode = 0;
//shmem_transport_ofi_eps[idx]->info->tx_attr->mode = 0;
//shmem_transport_ofi_eps[idx]->info->rx_attr->mode = 0;
//shmem_transport_ofi_eps[idx]->info->tx_attr->caps = info->p_info->caps;
//shmem_transport_ofi_eps[idx]->info->rx_attr->caps = FI_RECV; /* to drive progress on the CQ */;
/* FIX */
//shmem_transport_ofi_eps[idx]->info->ep_attr->tx_ctx_cnt = shmem_transport_ofi_stx_max > 0 ? FI_SHARED_CONTEXT : 0;
//shmem_transport_ofi_eps[idx]->info->caps = FI_RMA | FI_WRITE | FI_READ | FI_ATOMIC | FI_RECV;
//shmem_transport_ofi_eps[idx]->info->tx_attr->op_flags = FI_DELIVERY_COMPLETE;
//shmem_transport_ofi_eps[idx]->info->mode = 0;
//shmem_transport_ofi_eps[idx]->info->tx_attr->mode = 0;
//shmem_transport_ofi_eps[idx]->info->rx_attr->mode = 0;
//shmem_transport_ofi_eps[idx]->info->tx_attr->caps = info->p_info->caps;
//shmem_transport_ofi_eps[idx]->info->rx_attr->caps = FI_RECV; /* to drive progress on the CQ */;
#ifdef USE_CTX_LOCK
SHMEM_MUTEX_INIT(ctx->lock);
#endif
Expand Down
31 changes: 17 additions & 14 deletions src/transport_ofi.h
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ void shmem_transport_cswap_nbi(shmem_transport_ctx_t* ctx, void *target, const

static inline
void shmem_transport_cswap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest,
const void *operand, size_t len, int pe, int datatype)
const void *operand, size_t len, int pe, int datatype, size_t nic_idx)
{
#ifdef ENABLE_MR_ENDPOINT
/* CXI provider currently does not support fetch atomics with FI_DELIVERY_COMPLETE
Expand All @@ -1028,10 +1028,10 @@ void shmem_transport_cswap(shmem_transport_ctx_t* ctx, void *target, const void
shmem_internal_assert(SHMEM_Dtsize[SHMEM_TRANSPORT_DTYPE(datatype)] == len);

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr[1]); /* FIX */
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr[nic_idx]); /* FIXED? */

do {
ret = fi_compare_atomic(ctx->ep[1], /* FIX */
ret = fi_compare_atomic(ctx->ep[nic_idx], /* FIXED? */
source,
1,
NULL,
Expand All @@ -1053,7 +1053,7 @@ void shmem_transport_cswap(shmem_transport_ctx_t* ctx, void *target, const void

static inline
void shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest,
const void *mask, size_t len, int pe, int datatype)
const void *mask, size_t len, int pe, int datatype, size_t nic_idx)
{
int ret = 0;
uint64_t dst = (uint64_t) pe;
Expand All @@ -1067,10 +1067,10 @@ void shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void
shmem_internal_assert(SHMEM_Dtsize[SHMEM_TRANSPORT_DTYPE(datatype)] == len);

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr[1]); /* FIX */
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr[nic_idx]); /* FIXED? */

do {
ret = fi_compare_atomic(ctx->ep[1], /* FIX */
ret = fi_compare_atomic(ctx->ep[nic_idx], /* FIXED? */
source,
1,
NULL,
Expand Down Expand Up @@ -1279,7 +1279,8 @@ void shmem_transport_fetch_atomic_nbi(shmem_transport_ctx_t* ctx, void *target,
static inline
void shmem_transport_fetch_atomic(shmem_transport_ctx_t* ctx, void *target,
const void *source, void *dest,
size_t len, int pe, int op, int datatype)
size_t len, int pe, int op, int datatype,
size_t nic_idx)
{
#ifdef ENABLE_MR_ENDPOINT
/* CXI provider currently does not support fetch atomics with FI_DELIVERY_COMPLETE
Expand All @@ -1300,10 +1301,10 @@ void shmem_transport_fetch_atomic(shmem_transport_ctx_t* ctx, void *target,
shmem_internal_assert(SHMEM_Dtsize[SHMEM_TRANSPORT_DTYPE(datatype)] == len);

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr[1]); /* FIX */
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr[nic_idx]); /* FIXED? */

do {
ret = fi_fetch_atomic(ctx->ep[1], /* FIX */
ret = fi_fetch_atomic(ctx->ep[nic_idx], /* FIXED */
source,
1,
NULL,
Expand All @@ -1324,10 +1325,11 @@ void shmem_transport_fetch_atomic(shmem_transport_ctx_t* ctx, void *target,
static inline
void shmem_transport_swap(shmem_transport_ctx_t* ctx, void *target,
const void *source, void *dest,
size_t len, int pe, int datatype)
size_t len, int pe, int datatype,
size_t nic_idx)
{
shmem_transport_fetch_atomic(ctx, target, source, dest, len, pe,
FI_ATOMIC_WRITE, datatype);
FI_ATOMIC_WRITE, datatype, nic_idx);
}


Expand All @@ -1354,7 +1356,7 @@ void shmem_transport_atomic_set(shmem_transport_ctx_t* ctx, void *target,
static inline
void shmem_transport_atomic_fetch(shmem_transport_ctx_t* ctx, void *target,
const void *source, size_t len, int pe,
int datatype)
int datatype, size_t nic_idx)
{
#ifdef ENABLE_MR_ENDPOINT
/* CXI provider currently does not support fetch atomics with FI_DELIVERY_COMPLETE
Expand All @@ -1365,7 +1367,7 @@ void shmem_transport_atomic_fetch(shmem_transport_ctx_t* ctx, void *target,
target, len, pe, FI_SUM, datatype);
#else
shmem_transport_fetch_atomic(ctx, (void *) source, (const void *) NULL,
target, len, pe, FI_ATOMIC_READ, datatype);
target, len, pe, FI_ATOMIC_READ, datatype, nic_idx);
#endif
}

Expand Down Expand Up @@ -1408,7 +1410,8 @@ void shmem_transport_put_ct_nb(shmem_transport_ct_t *ct, void *target,

static inline
void shmem_transport_get_ct(shmem_transport_ct_t *ct, void *target,
const void *source, size_t len, int pe)
const void *source, size_t len, int pe,
size_t nic_idx)
{
RAISE_ERROR_STR("OFI transport does not currently support CT operations");
}
Expand Down
13 changes: 7 additions & 6 deletions src/transport_portals4.h
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,8 @@ void shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *s

static inline
void shmem_transport_get_ct(shmem_transport_ct_t *ct, void *target,
const void *source, size_t len, int pe)
const void *source, size_t len, int pe,
size_t nic_idx)
{
#ifdef ENABLE_REMOTE_VIRTUAL_ADDRESSING
shmem_transport_portals4_get_internal((shmem_transport_ctx_t *)SHMEM_CTX_DEFAULT, target, source, len, pe, ct->shr_pt, -1);
Expand Down Expand Up @@ -718,7 +719,7 @@ shmem_transport_get_wait(shmem_transport_ctx_t* ctx, size_t idx)
static inline
void
shmem_transport_swap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, size_t len,
int pe, ptl_datatype_t datatype)
int pe, ptl_datatype_t datatype, size_t nic_idx)
{
int ret;
ptl_process_t peer;
Expand Down Expand Up @@ -769,7 +770,7 @@ static inline
void
shmem_transport_cswap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest,
const void *operand, size_t len, int pe,
ptl_datatype_t datatype)
ptl_datatype_t datatype, size_t nic_idx)
{
int ret;
ptl_process_t peer;
Expand Down Expand Up @@ -821,7 +822,7 @@ static inline
void
shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest,
const void *mask, size_t len, int pe,
ptl_datatype_t datatype)
ptl_datatype_t datatype, size_t nic_idx)
{
int ret;
ptl_process_t peer;
Expand Down Expand Up @@ -1020,7 +1021,7 @@ static inline
void
shmem_transport_fetch_atomic(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest,
size_t len, int pe, ptl_op_t op,
ptl_datatype_t datatype)
ptl_datatype_t datatype, size_t nic_idx)
{
int ret;
ptl_pt_index_t pt;
Expand Down Expand Up @@ -1081,7 +1082,7 @@ shmem_transport_atomic_set(shmem_transport_ctx_t* ctx, void *target, const void
static inline
void
shmem_transport_atomic_fetch(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len,
int pe, int datatype)
int pe, int datatype, size_t nic_idx)
{
shmem_internal_assert(len <= shmem_transport_portals4_max_fetch_atomic_size);

Expand Down
13 changes: 7 additions & 6 deletions src/transport_ucx.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ shmem_transport_get_wait(shmem_transport_ctx_t* ctx, size_t idx)
static inline
void
shmem_transport_swap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest,
size_t len, int pe, shm_internal_datatype_t datatype)
size_t len, int pe, shm_internal_datatype_t datatype, size_t nic_idx)
{
uint8_t *remote_addr;
ucp_rkey_h rkey;
Expand Down Expand Up @@ -402,7 +402,7 @@ static inline
void
shmem_transport_cswap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest,
const void *operand, size_t len, int pe,
shm_internal_datatype_t datatype)
shm_internal_datatype_t datatype, size_t nic_idx)
{
uint8_t *remote_addr;
ucp_rkey_h rkey;
Expand Down Expand Up @@ -530,7 +530,7 @@ shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const void *so
static inline
void
shmem_transport_fetch_atomic(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, size_t len,
int pe, shm_internal_op_t op, shm_internal_datatype_t datatype)
int pe, shm_internal_op_t op, shm_internal_datatype_t datatype, size_t nic_idx)
{
uint8_t *remote_addr;
ucp_rkey_h rkey;
Expand Down Expand Up @@ -613,7 +613,7 @@ shmem_transport_fetch_atomic_nbi(shmem_transport_ctx_t* ctx, void *target, const
static inline
void
shmem_transport_atomic_fetch(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len,
int pe, shm_internal_datatype_t datatype)
int pe, shm_internal_datatype_t datatype, size_t nic_idx)
{
uint8_t *remote_addr;
ucp_rkey_h rkey;
Expand Down Expand Up @@ -675,7 +675,7 @@ static inline
void
shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest,
const void *mask, size_t len, int pe,
shm_internal_datatype_t datatype)
shm_internal_datatype_t datatype, size_t nic_idx)
{
uint8_t *remote_addr;
ucp_rkey_h rkey;
Expand Down Expand Up @@ -779,7 +779,8 @@ shmem_transport_put_ct_nb(shmem_transport_ct_t *ct, void *target, const void

static inline
void shmem_transport_get_ct(shmem_transport_ct_t *ct, void
*target, const void *source, size_t len, int pe)
*target, const void *source, size_t len, int pe,
size_t nic_idx)
{
RAISE_ERROR_STR("No path to peer");
}
Expand Down

0 comments on commit f9b106f

Please sign in to comment.