From 3423c0228d6a9e7990f52d670ff5331b7f6d35d2 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Tue, 13 Aug 2024 17:37:18 -0500 Subject: [PATCH 1/6] mpid: add MPIR_Data and MPID_Send/Recv_data Add new MPID send/recv interfaces that accepts a data pointer rather than the (buf, count, datatype) triplet. This allows carrying more information for extensions. For example, we can support data chunks that often occurs in pipelining algorithms. We may also extend MPIR_Data to add memory attributes. --- src/include/mpir_misc.h | 10 ++++++++++ src/mpid/ch4/include/mpidch4.h | 4 ++++ src/mpid/ch4/src/ch4_send.h | 16 ++++++++++++++++ src/mpid/ch4/subconfigure.m4 | 2 ++ 4 files changed, 32 insertions(+) diff --git a/src/include/mpir_misc.h b/src/include/mpir_misc.h index 50c490357f0..066e23ddbca 100644 --- a/src/include/mpir_misc.h +++ b/src/include/mpir_misc.h @@ -14,6 +14,16 @@ #define MPIR_FINALIZE_CALLBACK_DEFAULT_PRIO 0 #define MPIR_FINALIZE_CALLBACK_MAX_PRIO 10 +/* Misc. declarations that need be included before e.g. mpidpre.h */ + +typedef struct MPIR_Data { + void *buf; + MPI_Aint count; + MPI_Datatype datatype; + MPI_Aint offset; + MPI_Aint length; +} MPIR_Data; + /* Define a typedef for the errflag value used by many internal * functions. If an error needs to be returned, these values can be * used to signal such. More details can be found further down in the diff --git a/src/mpid/ch4/include/mpidch4.h b/src/mpid/ch4/include/mpidch4.h index 3dd3528efbc..22e316c329c 100644 --- a/src/mpid/ch4/include/mpidch4.h +++ b/src/mpid/ch4/include/mpidch4.h @@ -65,12 +65,16 @@ int MPID_Progress_activate(int id); int MPID_Progress_deactivate(int id); MPL_STATIC_INLINE_PREFIX int MPID_Recv(void *, MPI_Aint, MPI_Datatype, int, int, MPIR_Comm *, int, MPI_Status *, MPIR_Request **) MPL_STATIC_INLINE_SUFFIX; +MPL_STATIC_INLINE_PREFIX int MPID_Recv_data(MPIR_Data * data, int source, int tag, MPIR_Comm * comm, + int attr, MPIR_Request ** req) MPL_STATIC_INLINE_SUFFIX; int MPID_Recv_init(void *, MPI_Aint, MPI_Datatype, int, int, MPIR_Comm *, int, MPIR_Request **); MPL_STATIC_INLINE_PREFIX void MPID_Request_set_completed(MPIR_Request *) MPL_STATIC_INLINE_SUFFIX; MPL_STATIC_INLINE_PREFIX int MPID_Request_complete(MPIR_Request *) MPL_STATIC_INLINE_SUFFIX; MPL_STATIC_INLINE_PREFIX int MPID_Request_is_anysource(MPIR_Request *) MPL_STATIC_INLINE_SUFFIX; MPL_STATIC_INLINE_PREFIX int MPID_Send(const void *, MPI_Aint, MPI_Datatype, int, int, MPIR_Comm *, int, MPIR_Request **) MPL_STATIC_INLINE_SUFFIX; +MPL_STATIC_INLINE_PREFIX int MPID_Send_data(MPIR_Data * data, int dest, int tag, MPIR_Comm * comm, + int attr, MPIR_Request **) MPL_STATIC_INLINE_SUFFIX; MPL_STATIC_INLINE_PREFIX int MPID_Ssend(const void *, MPI_Aint, MPI_Datatype, int, int, MPIR_Comm *, int, MPIR_Request **) MPL_STATIC_INLINE_SUFFIX; MPL_STATIC_INLINE_PREFIX int MPID_Rsend(const void *, MPI_Aint, MPI_Datatype, int, int, MPIR_Comm *, diff --git a/src/mpid/ch4/src/ch4_send.h b/src/mpid/ch4/src/ch4_send.h index 1e78055f081..d16d173978a 100644 --- a/src/mpid/ch4/src/ch4_send.h +++ b/src/mpid/ch4/src/ch4_send.h @@ -175,4 +175,20 @@ MPL_STATIC_INLINE_PREFIX int MPID_Cancel_send(MPIR_Request * sreq) goto fn_exit; } +MPL_STATIC_INLINE_PREFIX int MPID_Send_data(MPIR_Data * data, int rank, int tag, + MPIR_Comm * comm, int attr, MPIR_Request ** request) +{ + int mpi_errno = MPI_SUCCESS; + MPIDI_av_entry_t *av = NULL; + MPIR_FUNC_ENTER; + + MPIR_Assert(0); + + fn_exit: + MPIR_FUNC_EXIT; + return mpi_errno; + fn_fail: + goto fn_exit; +} + #endif /* CH4_SEND_H_INCLUDED */ diff --git a/src/mpid/ch4/subconfigure.m4 b/src/mpid/ch4/subconfigure.m4 index 2d8f35813db..53f73230cb8 100644 --- a/src/mpid/ch4/subconfigure.m4 +++ b/src/mpid/ch4/subconfigure.m4 @@ -14,6 +14,8 @@ AM_CONDITIONAL([BUILD_CH4],[test "$device_name" = "ch4"]) AM_COND_IF([BUILD_CH4],[ AC_MSG_NOTICE([RUNNING PREREQ FOR CH4 DEVICE]) +AC_DEFINE([HAVE_MPIR_DATA], 1, [Define if the device layer supports MPIR_Data.]) + # check availability of libfabric, ucx (for purpose of setting default) m4_define([libfabric_embedded_dir],[modules/libfabric]) m4_define([ucx_embedded_dir],[modules/ucx]) From a43ce4519927db7a00cf29378a45b87a46a06a21 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Wed, 14 Aug 2024 10:41:29 -0500 Subject: [PATCH 2/6] misc: add offset and length to Localcopy interfaces Add sendlengh input to MPIR_Localcopy_gpu and MPIR_Ilocalcopy_gpu; add sendoffset, sendlength, recvoffset input to MPIR_Ilocalcopy. This is to support partial/chunk data copy. MPIR_Localcopy is leftalone to minimize backward impact. --- src/include/mpir_gpu_util.h | 4 +- src/include/mpir_misc.h | 20 ++++---- src/mpi/misc/utils.c | 71 ++++++++++++++++------------ src/mpid/ch4/netmod/ofi/ofi_events.c | 7 +-- src/mpid/ch4/netmod/ofi/ofi_events.h | 2 +- src/mpid/ch4/netmod/ofi/ofi_impl.h | 4 +- src/mpid/ch4/netmod/ofi/ofi_send.h | 4 +- src/mpid/ch4/shm/ipc/gpu/gpu_post.c | 2 +- src/mpid/ch4/shm/posix/posix_rma.h | 15 +++--- src/mpid/ch4/src/ch4_coll_impl.h | 6 +-- 10 files changed, 75 insertions(+), 60 deletions(-) diff --git a/src/include/mpir_gpu_util.h b/src/include/mpir_gpu_util.h index 36f487cad9f..c0702f108d3 100644 --- a/src/include/mpir_gpu_util.h +++ b/src/include/mpir_gpu_util.h @@ -83,7 +83,7 @@ MPL_STATIC_INLINE_PREFIX void MPIR_gpu_host_swap_gpu(const void *buf, MPI_Aint c MPL_pointer_attr_t attr, void *host_buf) { if (host_buf) { - MPIR_Localcopy_gpu(buf, count, datatype, 0, &attr, host_buf, count, datatype, 0, NULL, + MPIR_Localcopy_gpu(buf, count, datatype, 0, -1, &attr, host_buf, count, datatype, 0, NULL, MPL_GPU_COPY_DIRECTION_NONE, MPL_GPU_ENGINE_TYPE_COPY_HIGH_BANDWIDTH, true); } @@ -99,7 +99,7 @@ MPL_STATIC_INLINE_PREFIX void MPIR_gpu_swap_back(void *host_buf, void *gpu_buf, MPL_STATIC_INLINE_PREFIX void MPIR_gpu_swap_back_gpu(void *host_buf, void *gpu_buf, MPI_Aint count, MPI_Datatype datatype, MPL_pointer_attr_t attr) { - MPIR_Localcopy_gpu(host_buf, count, datatype, 0, NULL, gpu_buf, count, datatype, 0, &attr, + MPIR_Localcopy_gpu(host_buf, count, datatype, 0, -1, NULL, gpu_buf, count, datatype, 0, &attr, MPL_GPU_COPY_DIRECTION_NONE, MPL_GPU_ENGINE_TYPE_COPY_HIGH_BANDWIDTH, true); } diff --git a/src/include/mpir_misc.h b/src/include/mpir_misc.h index 066e23ddbca..f960057dbaa 100644 --- a/src/include/mpir_misc.h +++ b/src/include/mpir_misc.h @@ -84,20 +84,22 @@ typedef struct { int MPIR_Localcopy(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype sendtype, void *recvbuf, MPI_Aint recvcount, MPI_Datatype recvtype); int MPIR_Ilocalcopy(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype sendtype, - void *recvbuf, MPI_Aint recvcount, MPI_Datatype recvtype, + MPI_Aint sendoffset, MPI_Aint sendlength, + void *recvbuf, MPI_Aint recvcount, MPI_Datatype recvtype, MPI_Aint recvoffset, MPIR_Typerep_req * typerep_req); int MPIR_Localcopy_stream(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype sendtype, void *recvbuf, MPI_Aint recvcount, MPI_Datatype recvtype, void *stream); int MPIR_Localcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype sendtype, - MPI_Aint sendoffset, MPL_pointer_attr_t * sendattr, void *recvbuf, - MPI_Aint recvcount, MPI_Datatype recvtype, MPI_Aint recvoffset, - MPL_pointer_attr_t * recvattr, MPL_gpu_copy_direction_t dir, - MPL_gpu_engine_type_t enginetype, bool commit); + MPI_Aint sendoffset, MPI_Aint sendlength, MPL_pointer_attr_t * sendattr, + void *recvbuf, MPI_Aint recvcount, MPI_Datatype recvtype, + MPI_Aint recvoffset, MPL_pointer_attr_t * recvattr, + MPL_gpu_copy_direction_t dir, MPL_gpu_engine_type_t enginetype, bool commit); int MPIR_Ilocalcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype sendtype, - MPI_Aint sendoffset, MPL_pointer_attr_t * sendattr, void *recvbuf, - MPI_Aint recvcount, MPI_Datatype recvtype, MPI_Aint recvoffset, - MPL_pointer_attr_t * recvattr, MPL_gpu_copy_direction_t dir, - MPL_gpu_engine_type_t enginetype, bool commit, MPIR_gpu_req * req); + MPI_Aint sendoffset, MPI_Aint sendlength, MPL_pointer_attr_t * sendattr, + void *recvbuf, MPI_Aint recvcount, MPI_Datatype recvtype, + MPI_Aint recvoffset, MPL_pointer_attr_t * recvattr, + MPL_gpu_copy_direction_t dir, MPL_gpu_engine_type_t enginetype, + bool commit, MPIR_gpu_req * req); /* Contiguous datatype calculates buffer address with `(char *) buf + dt_true_lb`. * However, dt_true_lb is treated as ptrdiff_t (signed), and when buf is MPI_BOTTOM diff --git a/src/mpi/misc/utils.c b/src/mpi/misc/utils.c index 88de7f1b40c..56c49481854 100644 --- a/src/mpi/misc/utils.c +++ b/src/mpi/misc/utils.c @@ -14,8 +14,10 @@ enum { LOCALCOPY_STREAM, }; +/* sendoffset, recvoffset, and sendlength enable partial data chunk copy. Use offset 0 and sendlength -1 + * to copy the entire data. */ static int do_localcopy(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype sendtype, - MPI_Aint sendoffset, void *recvbuf, MPI_Aint recvcount, + MPI_Aint sendoffset, MPI_Aint sendlength, void *recvbuf, MPI_Aint recvcount, MPI_Datatype recvtype, MPI_Aint recvoffset, int localcopy_kind, void *extra_param) { @@ -41,9 +43,13 @@ static int do_localcopy(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype se if (!sdata_sz || !rdata_sz) goto fn_exit; - copy_sz = sdata_sz; - if (copy_sz > rdata_sz) - copy_sz = rdata_sz; + if (sendlength == -1) { + copy_sz = sdata_sz; + if (copy_sz > rdata_sz) + copy_sz = rdata_sz; + } else { + copy_sz = sendlength; + } /* Builtin types is the common case; optimize for it */ MPIR_Datatype_is_contig(sendtype, &sendtype_iscontig); @@ -185,7 +191,8 @@ static int do_localcopy(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype se #ifdef MPL_HAVE_GPU static int do_localcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype sendtype, - MPI_Aint sendoffset, MPL_pointer_attr_t * send_attr, void *recvbuf, + MPI_Aint sendoffset, MPI_Aint sendlength, + MPL_pointer_attr_t * send_attr, void *recvbuf, MPI_Aint recvcount, MPI_Datatype recvtype, MPI_Aint recvoffset, MPL_pointer_attr_t * recv_attr, MPL_gpu_copy_direction_t dir, MPL_gpu_engine_type_t enginetype, bool commit, MPIR_gpu_req * gpu_req) @@ -213,9 +220,13 @@ static int do_localcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatyp if (!sdata_sz || !rdata_sz) goto fn_exit; - copy_sz = sdata_sz; - if (copy_sz > rdata_sz) - copy_sz = rdata_sz; + if (sendlength == -1) { + copy_sz = sdata_sz; + if (copy_sz > rdata_sz) + copy_sz = rdata_sz; + } else { + copy_sz = sendlength; + } /* This case is specific for contig datatypes */ MPIR_Datatype_is_contig(sendtype, &sendtype_iscontig); @@ -327,9 +338,8 @@ int MPIR_Localcopy(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype sendtyp MPIR_FUNC_ENTER; - mpi_errno = - do_localcopy(sendbuf, sendcount, sendtype, 0, recvbuf, recvcount, recvtype, 0, - LOCALCOPY_BLOCKING, NULL); + mpi_errno = do_localcopy(sendbuf, sendcount, sendtype, 0, -1, recvbuf, recvcount, recvtype, 0, + LOCALCOPY_BLOCKING, NULL); MPIR_ERR_CHECK(mpi_errno); fn_exit: @@ -340,15 +350,17 @@ int MPIR_Localcopy(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype sendtyp } int MPIR_Ilocalcopy(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype sendtype, - void *recvbuf, MPI_Aint recvcount, MPI_Datatype recvtype, + MPI_Aint sendoffset, MPI_Aint sendlength, + void *recvbuf, MPI_Aint recvcount, MPI_Datatype recvtype, MPI_Aint recvoffset, MPIR_Typerep_req * typerep_req) { int mpi_errno = MPI_SUCCESS; MPIR_FUNC_ENTER; - mpi_errno = do_localcopy(sendbuf, sendcount, sendtype, 0, recvbuf, recvcount, recvtype, - 0, LOCALCOPY_NONBLOCKING, typerep_req); + mpi_errno = do_localcopy(sendbuf, sendcount, sendtype, sendoffset, sendlength, + recvbuf, recvcount, recvtype, recvoffset, + LOCALCOPY_NONBLOCKING, typerep_req); MPIR_ERR_CHECK(mpi_errno); fn_exit: @@ -370,7 +382,7 @@ int MPIR_Localcopy_stream(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype MPIR_FUNC_ENTER; - mpi_errno = do_localcopy(sendbuf, sendcount, sendtype, 0, recvbuf, recvcount, + mpi_errno = do_localcopy(sendbuf, sendcount, sendtype, 0, -1, recvbuf, recvcount, recvtype, 0, LOCALCOPY_STREAM, stream); MPIR_ERR_CHECK(mpi_errno); @@ -382,7 +394,8 @@ int MPIR_Localcopy_stream(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype } int MPIR_Localcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype sendtype, - MPI_Aint sendoffset, MPL_pointer_attr_t * sendattr, void *recvbuf, + MPI_Aint sendoffset, MPI_Aint sendlength, + MPL_pointer_attr_t * sendattr, void *recvbuf, MPI_Aint recvcount, MPI_Datatype recvtype, MPI_Aint recvoffset, MPL_pointer_attr_t * recvattr, MPL_gpu_copy_direction_t dir, MPL_gpu_engine_type_t enginetype, bool commit) @@ -392,14 +405,13 @@ int MPIR_Localcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype sen MPIR_FUNC_ENTER; #ifdef MPL_HAVE_GPU - mpi_errno = - do_localcopy_gpu(sendbuf, sendcount, sendtype, sendoffset, sendattr, recvbuf, recvcount, - recvtype, recvoffset, recvattr, dir, enginetype, commit, NULL); + mpi_errno = do_localcopy_gpu(sendbuf, sendcount, sendtype, sendoffset, sendlength, sendattr, + recvbuf, recvcount, recvtype, recvoffset, recvattr, + dir, enginetype, commit, NULL); MPIR_ERR_CHECK(mpi_errno); #else - mpi_errno = - do_localcopy(sendbuf, sendcount, sendtype, sendoffset, recvbuf, recvcount, recvtype, - recvoffset, LOCALCOPY_BLOCKING, NULL); + mpi_errno = do_localcopy(sendbuf, sendcount, sendtype, sendoffset, sendlength, + recvbuf, recvcount, recvtype, recvoffset, LOCALCOPY_BLOCKING, NULL); MPIR_ERR_CHECK(mpi_errno); #endif @@ -411,7 +423,8 @@ int MPIR_Localcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype sen } int MPIR_Ilocalcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype sendtype, - MPI_Aint sendoffset, MPL_pointer_attr_t * sendattr, void *recvbuf, + MPI_Aint sendoffset, MPI_Aint sendlength, + MPL_pointer_attr_t * sendattr, void *recvbuf, MPI_Aint recvcount, MPI_Datatype recvtype, MPI_Aint recvoffset, MPL_pointer_attr_t * recvattr, MPL_gpu_copy_direction_t dir, MPL_gpu_engine_type_t enginetype, bool commit, MPIR_gpu_req * req) @@ -421,14 +434,14 @@ int MPIR_Ilocalcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype se MPIR_FUNC_ENTER; #ifdef MPL_HAVE_GPU - mpi_errno = - do_localcopy_gpu(sendbuf, sendcount, sendtype, sendoffset, sendattr, recvbuf, recvcount, - recvtype, recvoffset, recvattr, dir, enginetype, commit, req); + mpi_errno = do_localcopy_gpu(sendbuf, sendcount, sendtype, sendoffset, sendlength, sendattr, + recvbuf, recvcount, recvtype, recvoffset, recvattr, + dir, enginetype, commit, req); MPIR_ERR_CHECK(mpi_errno); #else - mpi_errno = - do_localcopy(sendbuf, sendcount, sendtype, sendoffset, recvbuf, recvcount, recvtype, - recvoffset, LOCALCOPY_NONBLOCKING, &req->u.y_req); + mpi_errno = do_localcopy(sendbuf, sendcount, sendtype, sendoffset, sendoffset, + recvbuf, recvcount, recvtype, recvoffset, + LOCALCOPY_NONBLOCKING, &req->u.y_req); MPIR_ERR_CHECK(mpi_errno); req->type = MPIR_TYPEREP_REQUEST; #endif diff --git a/src/mpid/ch4/netmod/ofi/ofi_events.c b/src/mpid/ch4/netmod/ofi/ofi_events.c index 84046946d11..a39475d2d90 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_events.c +++ b/src/mpid/ch4/netmod/ofi/ofi_events.c @@ -150,8 +150,9 @@ static int pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r, MPI_Aint actual_unpack_bytes; MPIR_gpu_req yreq; mpi_errno = - MPIR_Ilocalcopy_gpu(wc_buf, wc->len, MPI_BYTE, 0, NULL, recv_buf, recv_count, - datatype, 0, NULL, MPL_GPU_COPY_H2D, engine_type, 1, &yreq); + MPIR_Ilocalcopy_gpu(wc_buf, wc->len, MPI_BYTE, 0, -1, NULL, recv_buf, + recv_count, datatype, 0, NULL, MPL_GPU_COPY_H2D, + engine_type, 1, &yreq); MPIR_ERR_CHECK(mpi_errno); actual_unpack_bytes = wc->len; task = @@ -216,7 +217,7 @@ static int pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r, MPI_Aint actual_unpack_bytes; MPIR_gpu_req yreq; mpi_errno = - MPIR_Ilocalcopy_gpu(wc_buf, (MPI_Aint) wc->len, MPI_BYTE, 0, NULL, + MPIR_Ilocalcopy_gpu(wc_buf, (MPI_Aint) wc->len, MPI_BYTE, 0, -1, NULL, (char *) recv_buf, (MPI_Aint) recv_count, datatype, MPIDI_OFI_REQUEST(rreq, pipeline_info.offset), NULL, MPL_GPU_COPY_H2D, engine_type, 1, &yreq); diff --git a/src/mpid/ch4/netmod/ofi/ofi_events.h b/src/mpid/ch4/netmod/ofi/ofi_events.h index 036a49b5671..d6370b0b0f1 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_events.h +++ b/src/mpid/ch4/netmod/ofi/ofi_events.h @@ -109,7 +109,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_recv_event(int vci, struct fi_cq_tagged_e actual_unpack_bytes = wc->len; mpi_errno = MPIR_Localcopy_gpu(MPIDI_OFI_REQUEST(rreq, noncontig.pack.pack_buffer), count, - MPI_BYTE, 0, NULL, recv_buf, count, MPI_BYTE, 0, &attr, + MPI_BYTE, 0, -1, NULL, recv_buf, count, MPI_BYTE, 0, &attr, MPL_GPU_COPY_DIRECTION_NONE, engine, true); MPIR_ERR_CHECK(mpi_errno); } else { diff --git a/src/mpid/ch4/netmod/ofi/ofi_impl.h b/src/mpid/ch4/netmod/ofi/ofi_impl.h index 493980377f5..b9f4c090a07 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_impl.h +++ b/src/mpid/ch4/netmod/ofi/ofi_impl.h @@ -931,8 +931,8 @@ static int MPIDI_OFI_gpu_progress_send(void) commit = 1; mpi_errno = MPIR_Ilocalcopy_gpu((char *) send_task->send_buf, send_task->count, datatype, - send_task->offset, &send_task->attr, host_buf, chunk_sz, - MPI_BYTE, 0, NULL, MPL_GPU_COPY_D2H, engine_type, + send_task->offset, chunk_sz, &send_task->attr, host_buf, + chunk_sz, MPI_BYTE, 0, NULL, MPL_GPU_COPY_D2H, engine_type, commit, &yreq); MPIR_ERR_CHECK(mpi_errno); actual_pack_bytes = chunk_sz; diff --git a/src/mpid/ch4/netmod/ofi/ofi_send.h b/src/mpid/ch4/netmod/ofi/ofi_send.h index 423f44dd017..c52ae908c65 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_send.h +++ b/src/mpid/ch4/netmod/ofi/ofi_send.h @@ -356,7 +356,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send_normal(const void *buf, MPI_Aint cou MPIDI_OFI_gpu_get_send_engine_type(MPIR_CVAR_CH4_OFI_GPU_SEND_ENGINE_TYPE); if (dt_contig && engine != MPL_GPU_ENGINE_TYPE_LAST && MPL_gpu_query_pointer_is_dev(send_buf, &attr)) { - mpi_errno = MPIR_Localcopy_gpu(send_buf, data_sz, MPI_BYTE, 0, &attr, + mpi_errno = MPIR_Localcopy_gpu(send_buf, data_sz, MPI_BYTE, 0, -1, &attr, MPIDI_OFI_REQUEST(sreq, noncontig.pack.pack_buffer), data_sz, MPI_BYTE, 0, NULL, MPL_GPU_COPY_DIRECTION_NONE, engine, true); @@ -619,7 +619,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send(const void *buf, MPI_Aint count, MPI MPIDI_OFI_gpu_get_send_engine_type(MPIR_CVAR_CH4_OFI_GPU_SEND_ENGINE_TYPE); if (engine != MPL_GPU_ENGINE_TYPE_LAST) { mpi_errno = - MPIR_Localcopy_gpu(send_buf, data_sz, MPI_BYTE, 0, &attr, host_buf, + MPIR_Localcopy_gpu(send_buf, data_sz, MPI_BYTE, 0, -1, &attr, host_buf, data_sz, MPI_BYTE, 0, NULL, MPL_GPU_COPY_DIRECTION_NONE, engine, true); MPIR_ERR_CHECK(mpi_errno); diff --git a/src/mpid/ch4/shm/ipc/gpu/gpu_post.c b/src/mpid/ch4/shm/ipc/gpu/gpu_post.c index f706dd3e522..e0e1d282ca5 100644 --- a/src/mpid/ch4/shm/ipc/gpu/gpu_post.c +++ b/src/mpid/ch4/shm/ipc/gpu/gpu_post.c @@ -704,7 +704,7 @@ int MPIDI_GPU_copy_data_async(MPIDI_IPC_hdr * ipc_hdr, MPIR_Request * rreq, MPI_ MPIR_gpu_req yreq; MPL_gpu_engine_type_t engine = MPIDI_IPCI_choose_engine(ipc_hdr->ipc_handle.gpu.global_dev_id, dev_id); - mpi_errno = MPIR_Ilocalcopy_gpu(src_buf, src_count, src_dt, 0, NULL, + mpi_errno = MPIR_Ilocalcopy_gpu(src_buf, src_count, src_dt, 0, -1, NULL, MPIDIG_REQUEST(rreq, buffer), MPIDIG_REQUEST(rreq, count), MPIDIG_REQUEST(rreq, datatype), 0, &attr, MPL_GPU_COPY_DIRECTION_NONE, engine, true, &yreq); diff --git a/src/mpid/ch4/shm/posix/posix_rma.h b/src/mpid/ch4/shm/posix/posix_rma.h index 64b84fdbfc0..0d848d014b6 100644 --- a/src/mpid/ch4/shm/posix/posix_rma.h +++ b/src/mpid/ch4/shm/posix/posix_rma.h @@ -167,14 +167,14 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_do_put(const void *origin_addr, if (winattr & MPIDI_WINATTR_MR_PREFERRED) { MPIR_gpu_req yreq; mpi_errno = - MPIR_Ilocalcopy_gpu(origin_addr, origin_count, origin_datatype, 0, &origin_attr, + MPIR_Ilocalcopy_gpu(origin_addr, origin_count, origin_datatype, 0, -1, &origin_attr, target_addr, target_count, target_datatype, 0, &target_attr, MPL_GPU_COPY_DIRECTION_NONE, engine_type, 1, &yreq); if (yreq.type != MPIR_NULL_REQUEST) MPIDI_POSIX_rma_outstanding_req_enqueu(yreq, &win->dev.shm.posix); } else { mpi_errno = - MPIR_Localcopy_gpu(origin_addr, origin_count, origin_datatype, 0, &origin_attr, + MPIR_Localcopy_gpu(origin_addr, origin_count, origin_datatype, 0, -1, &origin_attr, target_addr, target_count, target_datatype, 0, &target_attr, MPL_GPU_COPY_DIRECTION_NONE, engine_type, 1); } @@ -186,8 +186,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_do_put(const void *origin_addr, * increase per-op+flush overhead. */ MPIR_gpu_req yreq; yreq.type = MPIR_TYPEREP_REQUEST; - mpi_errno = MPIR_Ilocalcopy(origin_addr, origin_count, origin_datatype, - target_addr, target_count, target_datatype, &yreq.u.y_req); + mpi_errno = MPIR_Ilocalcopy(origin_addr, origin_count, origin_datatype, 0, -1, + target_addr, target_count, target_datatype, 0, &yreq.u.y_req); MPIDI_POSIX_rma_outstanding_req_enqueu(yreq, &win->dev.shm.posix); } else { /* By default issuing blocking copy for lower per-op latency. */ @@ -259,7 +259,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_do_get(void *origin_addr, if (winattr & MPIDI_WINATTR_MR_PREFERRED) { MPIR_gpu_req yreq; mpi_errno = MPIR_Ilocalcopy_gpu(target_addr, target_count, - target_datatype, 0, NULL, origin_addr, + target_datatype, 0, -1, NULL, origin_addr, origin_count, origin_datatype, 0, NULL, MPL_GPU_COPY_DIRECTION_NONE, engine_type, 1, &yreq); if (yreq.type != MPIR_NULL_REQUEST) @@ -278,9 +278,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_do_get(void *origin_addr, * increase per-op+flush overhead. */ MPIR_gpu_req yreq; yreq.type = MPIR_TYPEREP_REQUEST; - mpi_errno = MPIR_Ilocalcopy(target_addr, target_count, - target_datatype, origin_addr, origin_count, origin_datatype, - &yreq.u.y_req); + mpi_errno = MPIR_Ilocalcopy(target_addr, target_count, target_datatype, 0, -1, + origin_addr, origin_count, origin_datatype, 0, &yreq.u.y_req); MPIDI_POSIX_rma_outstanding_req_enqueu(yreq, &win->dev.shm.posix); } else { /* By default issuing blocking copy for lower per-op latency. */ diff --git a/src/mpid/ch4/src/ch4_coll_impl.h b/src/mpid/ch4/src/ch4_coll_impl.h index ca626076491..2f58a927d89 100644 --- a/src/mpid/ch4/src/ch4_coll_impl.h +++ b/src/mpid/ch4/src/ch4_coll_impl.h @@ -613,7 +613,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_Allreduce_intra_composition_alpha(const void if (host_recvbuf != NULL) { recvbuf = in_recvbuf; - mpi_errno = MPIR_Localcopy_gpu(host_recvbuf, count, datatype, 0, NULL, + mpi_errno = MPIR_Localcopy_gpu(host_recvbuf, count, datatype, 0, -1, NULL, recvbuf, count, datatype, 0, &recv_attr, MPL_GPU_COPY_DIRECTION_NONE, MPL_GPU_ENGINE_TYPE_COPY_HIGH_BANDWIDTH, true); @@ -662,7 +662,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_Allreduce_intra_composition_beta(const void * if (host_recvbuf != NULL) { recvbuf = in_recvbuf; - mpi_errno = MPIR_Localcopy_gpu(host_recvbuf, count, datatype, 0, NULL, + mpi_errno = MPIR_Localcopy_gpu(host_recvbuf, count, datatype, 0, -1, NULL, recvbuf, count, datatype, 0, &recv_attr, MPL_GPU_COPY_DIRECTION_NONE, MPL_GPU_ENGINE_TYPE_COPY_HIGH_BANDWIDTH, true); @@ -714,7 +714,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_Allreduce_intra_composition_gamma(const void if (host_recvbuf != NULL) { recvbuf = in_recvbuf; - mpi_errno = MPIR_Localcopy_gpu(host_recvbuf, count, datatype, 0, NULL, + mpi_errno = MPIR_Localcopy_gpu(host_recvbuf, count, datatype, 0, -1, NULL, recvbuf, count, datatype, 0, &recv_attr, MPL_GPU_COPY_DIRECTION_NONE, MPL_GPU_ENGINE_TYPE_COPY_HIGH_BANDWIDTH, true); From 8d135a7e082e420f66dc90f170419c22a41e334d Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Wed, 14 Aug 2024 13:50:47 -0500 Subject: [PATCH 3/6] ch4/self: implement MPIDI_Self_{send,recv}_data Support MPIR_Data in the self path. --- src/mpid/ch4/include/mpidpre.h | 4 +- src/mpid/ch4/src/ch4_self.c | 105 ++++++++++++++++++++++----------- src/mpid/ch4/src/ch4_self.h | 4 ++ 3 files changed, 75 insertions(+), 38 deletions(-) diff --git a/src/mpid/ch4/include/mpidpre.h b/src/mpid/ch4/include/mpidpre.h index 0b9ec6e1df9..ced39997504 100644 --- a/src/mpid/ch4/include/mpidpre.h +++ b/src/mpid/ch4/include/mpidpre.h @@ -282,9 +282,7 @@ typedef struct MPIDI_part_request { /* message queue within "self"-comms, i.e. MPI_COMM_SELF and all communicators with size of 1. */ typedef struct { - void *buf; - MPI_Aint count; - MPI_Datatype datatype; + struct MPIR_Data data; int tag; int context_id; MPIR_Request *match_req; /* for mrecv */ diff --git a/src/mpid/ch4/src/ch4_self.c b/src/mpid/ch4/src/ch4_self.c index 5b836d0a84e..09ff8276c55 100644 --- a/src/mpid/ch4/src/ch4_self.c +++ b/src/mpid/ch4/src/ch4_self.c @@ -23,22 +23,20 @@ static MPIR_Request *self_recv_queue; * req->dev.ch4.self */ -#define ENQUEUE_SELF(req_, buf_, count_, datatype_, tag_, context_id_, queue) \ +#define ENQUEUE_SELF(req_, data_, tag_, context_id_, queue) \ do { \ MPIR_Request_add_ref(req_); \ - req_->dev.ch4.self.buf = (char *) buf_; \ - req_->dev.ch4.self.count = count_; \ - req_->dev.ch4.self.datatype = datatype_; \ + memcpy(&req_->dev.ch4.self.data, data_, sizeof(MPIR_Data)); \ req_->dev.ch4.self.tag = tag_; \ req_->dev.ch4.self.context_id = context_id_; \ DL_APPEND(queue, req_); \ } while (0) -#define ENQUEUE_SELF_SEND(req, buf, count, datatype, tag, context_id) \ - ENQUEUE_SELF(req, buf, count, datatype, tag, context_id, self_send_queue) +#define ENQUEUE_SELF_SEND(req, data, tag, context_id) \ + ENQUEUE_SELF(req, data, tag, context_id, self_send_queue) -#define ENQUEUE_SELF_RECV(req, buf, count, datatype, tag, context_id) \ - ENQUEUE_SELF(req, buf, count, datatype, tag, context_id, self_recv_queue) +#define ENQUEUE_SELF_RECV(req, data, tag, context_id) \ + ENQUEUE_SELF(req, data, tag, context_id, self_recv_queue) #define MATCH_TAG(tag1, tag2) (tag1 == tag2 || tag1 == MPI_ANY_TAG || tag2 == MPI_ANY_TAG) #define DEQUEUE_SELF(req_, tag_, context_id_, queue) \ @@ -94,14 +92,18 @@ int MPIDI_Self_finalize(void) return MPI_SUCCESS; } -#define SELF_COPY(sendbuf, sendcnt, sendtype, recvbuf, recvcnt, recvtype, status, tag) \ +#define SELF_COPY(sendbuf, sendcnt, sendtype, sendoffset, sendlength, recvbuf, recvcnt, recvtype, recvoffset, status, tag) \ do { \ MPI_Aint sdata_sz, rdata_sz; \ MPIR_Datatype_get_size_macro(sendtype, sdata_sz); \ MPIR_Datatype_get_size_macro(recvtype, rdata_sz); \ sdata_sz *= sendcnt; \ rdata_sz *= recvcnt; \ - MPIR_Localcopy(sendbuf, sendcnt, sendtype, recvbuf, recvcnt, recvtype); \ + \ + MPIR_Typerep_req typerep_req; \ + MPIR_Ilocalcopy(sendbuf, sendcnt, sendtype, sendoffset, sendlength, recvbuf, recvcnt, recvtype, recvoffset, &typerep_req); \ + MPIR_Typerep_wait(typerep_req); \ + \ status.MPI_SOURCE = 0; \ status.MPI_TAG = tag; \ if (sdata_sz > rdata_sz) { \ @@ -117,8 +119,8 @@ int MPIDI_Self_finalize(void) } \ } while (0) -int MPIDI_Self_isend(const void *buf, MPI_Aint count, MPI_Datatype datatype, int rank, int tag, - MPIR_Comm * comm, int attr, MPIR_Request ** request) +int MPIDI_Self_send_data(MPIR_Data * data, int rank, int tag, + MPIR_Comm * comm, int attr, MPIR_Request ** request) { MPIR_Request *sreq = NULL; MPIR_Request *rreq = NULL; @@ -127,19 +129,21 @@ int MPIDI_Self_isend(const void *buf, MPI_Aint count, MPI_Datatype datatype, int DEQUEUE_SELF_RECV(rreq, tag, comm->context_id); if (rreq) { - SELF_COPY(buf, count, datatype, rreq->dev.ch4.self.buf, - rreq->dev.ch4.self.count, rreq->dev.ch4.self.datatype, rreq->status, tag); + MPIR_Data *rdata = &rreq->dev.ch4.self.data; + MPIR_Assert(data->length == rdata->length); + SELF_COPY(data->buf, data->count, data->datatype, data->offset, data->length, + rdata->buf, rdata->count, rdata->datatype, rdata->offset, rreq->status, tag); /* comm will be released by MPIR_Request_free(rreq) */ - MPIR_Datatype_release_if_not_builtin(rreq->dev.ch4.self.datatype); + MPIR_Datatype_release_if_not_builtin(rdata->datatype); MPIR_Request_complete(rreq); sreq = MPIR_Request_create_complete(MPIR_REQUEST_KIND__SEND); } else { sreq = MPIR_Request_create(MPIR_REQUEST_KIND__SEND); - ENQUEUE_SELF_SEND(sreq, buf, count, datatype, tag, comm->context_id); + ENQUEUE_SELF_SEND(sreq, data, tag, comm->context_id); MPII_UNEXPQ_REMEMBER(sreq, rank, tag, comm->context_id); sreq->comm = comm; MPIR_Comm_add_ref(comm); - MPIR_Datatype_add_ref_if_not_builtin(datatype); + MPIR_Datatype_add_ref_if_not_builtin(data->datatype); } *request = sreq; @@ -148,8 +152,23 @@ int MPIDI_Self_isend(const void *buf, MPI_Aint count, MPI_Datatype datatype, int return MPI_SUCCESS; } -int MPIDI_Self_irecv(void *buf, MPI_Aint count, MPI_Datatype datatype, int rank, int tag, +int MPIDI_Self_isend(const void *buf, MPI_Aint count, MPI_Datatype datatype, int rank, int tag, MPIR_Comm * comm, int attr, MPIR_Request ** request) +{ + /* Are we concerned about this extra param copy? */ + MPIR_Data data = { + .buf = (void *) buf, + .count = count, + .datatype = datatype, + .offset = 0, + .length = -1, + }; + + return MPIDI_Self_send_data(&data, rank, tag, comm, attr, request); +} + +int MPIDI_Self_recv_data(MPIR_Data * data, int rank, int tag, + MPIR_Comm * comm, int attr, MPIR_Request ** request) { MPIR_Request *sreq = NULL; MPIR_Request *rreq = NULL; @@ -160,20 +179,20 @@ int MPIDI_Self_irecv(void *buf, MPI_Aint count, MPI_Datatype datatype, int rank, DEQUEUE_SELF_SEND(sreq, tag, comm->context_id); if (sreq) { - SELF_COPY(sreq->dev.ch4.self.buf, - sreq->dev.ch4.self.count, - sreq->dev.ch4.self.datatype, - buf, count, datatype, rreq->status, sreq->dev.ch4.self.tag); + MPIR_Data *sdata = &rreq->dev.ch4.self.data; + SELF_COPY(sdata->buf, sdata->count, sdata->datatype, sdata->offset, sdata->length, + data->buf, data->count, data->datatype, data->offset, + rreq->status, sreq->dev.ch4.self.tag); /* comm will be released by MPIR_Request_free(sreq) */ - MPIR_Datatype_release_if_not_builtin(sreq->dev.ch4.self.datatype); + MPIR_Datatype_release_if_not_builtin(sdata->datatype); MPIR_Request_complete(sreq); MPIR_cc_set(&rreq->cc, 0); MPII_UNEXPQ_FORGET(sreq); } else { - ENQUEUE_SELF_RECV(rreq, buf, count, datatype, tag, comm->context_id); + ENQUEUE_SELF_RECV(rreq, data, tag, comm->context_id); rreq->comm = comm; MPIR_Comm_add_ref(comm); - MPIR_Datatype_add_ref_if_not_builtin(datatype); + MPIR_Datatype_add_ref_if_not_builtin(data->datatype); } *request = rreq; @@ -182,6 +201,20 @@ int MPIDI_Self_irecv(void *buf, MPI_Aint count, MPI_Datatype datatype, int rank, return MPI_SUCCESS; } +int MPIDI_Self_irecv(void *buf, MPI_Aint count, MPI_Datatype datatype, int rank, int tag, + MPIR_Comm * comm, int attr, MPIR_Request ** request) +{ + MPIR_Data data = { + .buf = buf, + .count = count, + .datatype = datatype, + .offset = 0, + .length = -1, + }; + + return MPIDI_Self_recv_data(&data, rank, tag, comm, attr, request); +} + int MPIDI_Self_iprobe(int rank, int tag, MPIR_Comm * comm, int attr, int *flag, MPI_Status * status) { MPIR_Request *sreq = NULL; @@ -192,8 +225,8 @@ int MPIDI_Self_iprobe(int rank, int tag, MPIR_Comm * comm, int attr, int *flag, if (sreq) { if (status != MPI_STATUS_IGNORE) { MPI_Aint sdata_sz; - MPIR_Datatype_get_size_macro(sreq->dev.ch4.self.datatype, sdata_sz); - sdata_sz *= sreq->dev.ch4.self.count; + MPIR_Datatype_get_size_macro(sreq->dev.ch4.self.data.datatype, sdata_sz); + sdata_sz *= sreq->dev.ch4.self.data.count; MPIR_STATUS_SET_COUNT(*status, sdata_sz); status->MPI_SOURCE = 0; status->MPI_TAG = sreq->dev.ch4.self.tag; @@ -219,8 +252,8 @@ int MPIDI_Self_improbe(int rank, int tag, MPIR_Comm * comm, int attr, if (sreq) { if (status != MPI_STATUS_IGNORE) { MPI_Aint sdata_sz; - MPIR_Datatype_get_size_macro(sreq->dev.ch4.self.datatype, sdata_sz); - sdata_sz *= sreq->dev.ch4.self.count; + MPIR_Datatype_get_size_macro(sreq->dev.ch4.self.data.datatype, sdata_sz); + sdata_sz *= sreq->dev.ch4.self.data.count; MPIR_STATUS_SET_COUNT(*status, sdata_sz); status->MPI_SOURCE = 0; status->MPI_TAG = sreq->dev.ch4.self.tag; @@ -250,12 +283,14 @@ int MPIDI_Self_imrecv(char *buf, MPI_Aint count, MPI_Datatype datatype, MPIR_Request *sreq = message->dev.ch4.self.match_req; MPIR_Request *rreq = message; rreq->kind = MPIR_REQUEST_KIND__RECV; - SELF_COPY(sreq->dev.ch4.self.buf, - sreq->dev.ch4.self.count, - sreq->dev.ch4.self.datatype, - buf, count, datatype, rreq->status, sreq->dev.ch4.self.tag); + SELF_COPY(sreq->dev.ch4.self.data.buf, + sreq->dev.ch4.self.data.count, + sreq->dev.ch4.self.data.datatype, + sreq->dev.ch4.self.data.length, + sreq->dev.ch4.self.data.offset, + buf, count, datatype, 0, rreq->status, sreq->dev.ch4.self.tag); /* comm will be released by MPIR_Request_free(sreq) */ - MPIR_Datatype_release_if_not_builtin(sreq->dev.ch4.self.datatype); + MPIR_Datatype_release_if_not_builtin(sreq->dev.ch4.self.data.datatype); MPIR_Request_complete(sreq); MPIR_cc_set(&rreq->cc, 0); @@ -291,7 +326,7 @@ int MPIDI_Self_cancel(MPIR_Request * request) } if (found) { /* comm will be released at MPIR_Request_free */ - MPIR_Datatype_release_if_not_builtin(request->dev.ch4.self.datatype); + MPIR_Datatype_release_if_not_builtin(request->dev.ch4.self.data.datatype); } else { goto fn_exit; } diff --git a/src/mpid/ch4/src/ch4_self.h b/src/mpid/ch4/src/ch4_self.h index 231e6b9212e..be139707f74 100644 --- a/src/mpid/ch4/src/ch4_self.h +++ b/src/mpid/ch4/src/ch4_self.h @@ -12,6 +12,10 @@ int MPIDI_Self_isend(const void *buf, MPI_Aint count, MPI_Datatype datatype, int MPIR_Comm * comm, int attr, MPIR_Request ** request); int MPIDI_Self_irecv(void *buf, MPI_Aint count, MPI_Datatype datatype, int rank, int tag, MPIR_Comm * comm, int attr, MPIR_Request ** request); +int MPIDI_Self_send_data(MPIR_Data * data, int rank, int tag, + MPIR_Comm * comm, int attr, MPIR_Request ** request); +int MPIDI_Self_recv_data(MPIR_Data * data, int rank, int tag, + MPIR_Comm * comm, int attr, MPIR_Request ** request); int MPIDI_Self_iprobe(int rank, int tag, MPIR_Comm * comm, int attr, int *flag, MPI_Status * status); int MPIDI_Self_improbe(int rank, int tag, MPIR_Comm * comm, int attr, From 6b98800a7e54a8cd4e4e5b5140fe5176ca3c95f9 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Wed, 14 Aug 2024 13:51:52 -0500 Subject: [PATCH 4/6] ch4: implement MPID_{Send,Recv}_data --- src/mpid/ch4/src/ch4_recv.h | 27 +++++++++++++++++++++++++++ src/mpid/ch4/src/ch4_send.h | 28 +++++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/src/mpid/ch4/src/ch4_recv.h b/src/mpid/ch4/src/ch4_recv.h index 6eba7cd6570..4f5e41722c8 100644 --- a/src/mpid/ch4/src/ch4_recv.h +++ b/src/mpid/ch4/src/ch4_recv.h @@ -253,6 +253,33 @@ MPL_STATIC_INLINE_PREFIX int MPID_Irecv(void *buf, goto fn_exit; } +MPL_STATIC_INLINE_PREFIX int MPID_Recv_data(MPIR_Data * data, int rank, int tag, + MPIR_Comm * comm, int attr, MPIR_Request ** request) +{ + int mpi_errno = MPI_SUCCESS; + MPIR_FUNC_ENTER; + + if (MPIR_is_self_comm(comm)) { + mpi_errno = MPIDI_Self_recv_data(data, rank, tag, comm, attr, request); + } else { +#if 1 + MPIR_Assert(0); +#else + MPIDI_av_entry_t *av = (rank == MPI_ANY_SOURCE ? NULL : MPIDIU_comm_rank_to_av(comm, rank)); + mpi_errno = MPIDI_irecv_data(data, rank, tag, comm, attr, av, request); +#endif + } + + MPIR_ERR_CHECK(mpi_errno); + + MPII_RECVQ_REMEMBER(*request, rank, tag, comm->recvcontext_id, buf, count); + fn_exit: + MPIR_FUNC_EXIT; + return mpi_errno; + fn_fail: + goto fn_exit; +} + MPL_STATIC_INLINE_PREFIX int MPID_Cancel_recv(MPIR_Request * rreq) { int mpi_errno; diff --git a/src/mpid/ch4/src/ch4_send.h b/src/mpid/ch4/src/ch4_send.h index d16d173978a..93d26a65771 100644 --- a/src/mpid/ch4/src/ch4_send.h +++ b/src/mpid/ch4/src/ch4_send.h @@ -182,7 +182,33 @@ MPL_STATIC_INLINE_PREFIX int MPID_Send_data(MPIR_Data * data, int rank, int tag, MPIDI_av_entry_t *av = NULL; MPIR_FUNC_ENTER; - MPIR_Assert(0); + if (MPIR_is_self_comm(comm)) { + mpi_errno = MPIDI_Self_send_data(data, rank, tag, comm, attr, request); + } else { +#if 1 + MPIR_Assert(0); +#else + *(request) = NULL; +#ifdef MPIDI_CH4_DIRECT_NETMOD + mpi_errno = MPIDI_NM_send_data(data, rank, tag, comm, attr, av, req); +#else + int r; + if ((r = MPIDI_av_is_local(av))) + mpi_errno = MPIDI_SHM_send_data(data, rank, tag, comm, attr, av, req); + else + mpi_errno = MPIDI_NM_send_data(data, rank, tag, comm, attr, av, req); + if (mpi_errno == MPI_SUCCESS) + MPIDI_REQUEST(*req, is_local) = r; +#endif + MPIR_ERR_CHECK(mpi_errno); +#endif + } + + MPIR_ERR_CHECK(mpi_errno); + + if (*request) { + MPII_SENDQ_REMEMBER(*request, rank, tag, comm->recvcontext_id, buf, count); + } fn_exit: MPIR_FUNC_EXIT; From 824a6f67b6537895971f37a092c4617119c9a3cd Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Thu, 15 Aug 2024 14:05:33 -0500 Subject: [PATCH 5/6] ch4: remove buf-count-datatype from am_check_eager interface The buf, count, datatype triplet are not needed in checking eager limit. Removing them from the interface facilitates later we add partial datatype via MPIR_Data. --- doc/wiki/developer_guide.md | 4 +--- src/mpid/ch4/ch4_api.txt | 4 ++-- src/mpid/ch4/netmod/ofi/ofi_am.h | 3 +-- src/mpid/ch4/netmod/ucx/ucx_am.h | 3 +-- src/mpid/ch4/shm/src/shm_am.h | 3 +-- src/mpid/ch4/src/mpidig_send.h | 11 +++++------ 6 files changed, 11 insertions(+), 17 deletions(-) diff --git a/doc/wiki/developer_guide.md b/doc/wiki/developer_guide.md index 89102ca9bbf..680980513c3 100644 --- a/doc/wiki/developer_guide.md +++ b/doc/wiki/developer_guide.md @@ -303,9 +303,7 @@ MPI_Aint MPIDI_[NM|SHM]_am_eager_limit(void) MPI_Aint MPIDI_[NM|SHM]_am_eager_buf_limit(void) /* return true/false if pt2pt message can be sent eagerly */ -bool MPIDI_[NM|SHM]_am_check_eager(MPI_Aint am_hdr_sz, MPI_Aint data_sz, - const void *data, MPI_Aint count, - MPI_Datatype datatype, MPIR_Request * sreq) +bool MPIDI_[NM|SHM]_am_check_eager(MPI_Aint am_hdr_sz, MPI_Aint data_sz, MPIR_Request * sreq) /****************** Callback APIs ******************/ diff --git a/src/mpid/ch4/ch4_api.txt b/src/mpid/ch4/ch4_api.txt index 96ace1800b8..f69e0805246 100644 --- a/src/mpid/ch4/ch4_api.txt +++ b/src/mpid/ch4/ch4_api.txt @@ -76,8 +76,8 @@ Non Native API: NM*: void SHM*: void am_check_eager: bool - NM*: am_hdr_sz, data_sz, data, count, datatype, sreq - SHM*: am_hdr_sz, data_sz, data, count, datatype, sreq + NM*: am_hdr_sz, data_sz, sreq + SHM*: am_hdr_sz, data_sz, sreq comm_get_gpid : int NM*: comm_ptr, idx, gpid_ptr, is_remote get_local_upids : int diff --git a/src/mpid/ch4/netmod/ofi/ofi_am.h b/src/mpid/ch4/netmod/ofi/ofi_am.h index 19dd1348e89..d4140e3ebf9 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_am.h +++ b/src/mpid/ch4/netmod/ofi/ofi_am.h @@ -192,8 +192,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_NM_am_send_hdr_reply(MPIR_Comm * comm, } MPL_STATIC_INLINE_PREFIX bool MPIDI_NM_am_check_eager(MPI_Aint am_hdr_sz, MPI_Aint data_sz, - const void *data, MPI_Aint count, - MPI_Datatype datatype, MPIR_Request * sreq) + MPIR_Request * sreq) { MPIDI_OFI_AMREQUEST(sreq, data_sz) = data_sz; if ((am_hdr_sz + data_sz) diff --git a/src/mpid/ch4/netmod/ucx/ucx_am.h b/src/mpid/ch4/netmod/ucx/ucx_am.h index 68fa6aa2d06..2155a59abe5 100644 --- a/src/mpid/ch4/netmod/ucx/ucx_am.h +++ b/src/mpid/ch4/netmod/ucx/ucx_am.h @@ -261,8 +261,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_NM_am_send_hdr_reply(MPIR_Comm * comm, } MPL_STATIC_INLINE_PREFIX bool MPIDI_NM_am_check_eager(MPI_Aint am_hdr_sz, MPI_Aint data_sz, - const void *data, MPI_Aint count, - MPI_Datatype datatype, MPIR_Request * sreq) + MPIR_Request * sreq) { #ifdef HAVE_UCP_AM_NBX /* ucx will handle rndv */ diff --git a/src/mpid/ch4/shm/src/shm_am.h b/src/mpid/ch4/shm/src/shm_am.h index e596c647070..57f7612c423 100644 --- a/src/mpid/ch4/shm/src/shm_am.h +++ b/src/mpid/ch4/shm/src/shm_am.h @@ -110,8 +110,7 @@ MPL_STATIC_INLINE_PREFIX void MPIDI_SHM_am_request_finalize(MPIR_Request * req) } MPL_STATIC_INLINE_PREFIX bool MPIDI_SHM_am_check_eager(MPI_Aint am_hdr_sz, MPI_Aint data_sz, - const void *data, MPI_Aint count, - MPI_Datatype datatype, MPIR_Request * sreq) + MPIR_Request * sreq) { /* TODO: add checking for IPC transmission */ return (am_hdr_sz + data_sz) <= MPIDI_POSIX_am_eager_limit(); diff --git a/src/mpid/ch4/src/mpidig_send.h b/src/mpid/ch4/src/mpidig_send.h index 36a36da42c0..1ff868d4cc6 100644 --- a/src/mpid/ch4/src/mpidig_send.h +++ b/src/mpid/ch4/src/mpidig_send.h @@ -27,16 +27,15 @@ #define MPIDIG_AM_SEND_GET_RNDV_ID(flags) (flags >> 8) MPL_STATIC_INLINE_PREFIX bool MPIDIG_check_eager(int is_local, MPI_Aint am_hdr_sz, MPI_Aint data_sz, - const void *buf, MPI_Aint count, - MPI_Datatype datatype, MPIR_Request * sreq) + MPIR_Request * sreq) { #ifdef MPIDI_CH4_DIRECT_NETMOD - return MPIDI_NM_am_check_eager(am_hdr_sz, data_sz, buf, count, datatype, sreq); + return MPIDI_NM_am_check_eager(am_hdr_sz, data_sz, sreq); #else if (is_local) { - return MPIDI_SHM_am_check_eager(am_hdr_sz, data_sz, buf, count, datatype, sreq); + return MPIDI_SHM_am_check_eager(am_hdr_sz, data_sz, sreq); } else { - return MPIDI_NM_am_check_eager(am_hdr_sz, data_sz, buf, count, datatype, sreq); + return MPIDI_NM_am_check_eager(am_hdr_sz, data_sz, sreq); } #endif } @@ -91,7 +90,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDIG_isend_impl(const void *buf, MPI_Aint count, int is_local = MPIDI_av_is_local(addr); MPI_Aint am_hdr_sz = (MPI_Aint) sizeof(am_hdr); - if (MPIDIG_check_eager(is_local, am_hdr_sz, data_sz, buf, count, datatype, sreq)) { + if (MPIDIG_check_eager(is_local, am_hdr_sz, data_sz, sreq)) { /* EAGER send */ CH4_CALL(am_isend(rank, comm, MPIDIG_SEND, &am_hdr, am_hdr_sz, buf, count, datatype, src_vci, dst_vci, sreq), is_local, mpi_errno); From 3cc7ccb3995997fcb12d4580dc9c982c4140084b Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Thu, 15 Aug 2024 10:37:26 -0500 Subject: [PATCH 6/6] ch4: implement MPIR_Data send/recv in netmod/shm/mpidig (WIP) --- src/mpid/ch4/ch4_api.txt | 7 +++++ src/mpid/ch4/src/mpidig_send.h | 57 ++++++++++++++++++++++++---------- 2 files changed, 47 insertions(+), 17 deletions(-) diff --git a/src/mpid/ch4/ch4_api.txt b/src/mpid/ch4/ch4_api.txt index f69e0805246..f53901033fd 100644 --- a/src/mpid/ch4/ch4_api.txt +++ b/src/mpid/ch4/ch4_api.txt @@ -138,12 +138,18 @@ Non Native API: SHM*: req Native API: + send_data : int + NM*: data-2, rank, tag, comm, attr-2, addr, req_p + SHM*: data-2, rank, tag, comm, attr-2, addr, req_p mpi_isend : int NM*: buf, count, datatype, rank, tag, comm, attr-2, addr, req_p SHM*: buf, count, datatype, rank, tag, comm, attr-2, addr, req_p mpi_cancel_send : int NM*: sreq SHM*: sreq + recv_data : int + NM*: data-2, rank, tag, comm, attr-2, addr, req_p, partner + SHM*: data-2, rank, tag, comm, attr-2, req_p mpi_irecv : int NM*: buf-2, count, datatype, rank, tag, comm, attr-2, addr, req_p, partner SHM*: buf-2, count, datatype, rank, tag, comm, attr-2, req_p @@ -446,6 +452,7 @@ PARAM: context_id: MPIR_Context_id_t count: MPI_Aint data: const void * + data-2: MPIR_Data * data_sz: MPI_Aint datatype: MPI_Datatype datatype_p: MPIR_Datatype * diff --git a/src/mpid/ch4/src/mpidig_send.h b/src/mpid/ch4/src/mpidig_send.h index 1ff868d4cc6..af1dcc8f081 100644 --- a/src/mpid/ch4/src/mpidig_send.h +++ b/src/mpid/ch4/src/mpidig_send.h @@ -40,8 +40,7 @@ MPL_STATIC_INLINE_PREFIX bool MPIDIG_check_eager(int is_local, MPI_Aint am_hdr_s #endif } -MPL_STATIC_INLINE_PREFIX int MPIDIG_isend_impl(const void *buf, MPI_Aint count, - MPI_Datatype datatype, int rank, int tag, +MPL_STATIC_INLINE_PREFIX int MPIDIG_isend_impl(MPIR_Data * data, int rank, int tag, MPIR_Comm * comm, int context_offset, MPIDI_av_entry_t * addr, uint8_t flags, int src_vci, int dst_vci, @@ -78,29 +77,35 @@ MPL_STATIC_INLINE_PREFIX int MPIDIG_isend_impl(const void *buf, MPI_Aint count, MPIR_cc_inc(sreq->cc_ptr); /* expecting SSEND_ACK */ } MPI_Aint data_sz; - MPIDI_Datatype_check_size(datatype, count, data_sz); + if (data->length > 0) { + data_sz = data->length; + } else { + MPIDI_Datatype_check_size(data->datatype, data->count, data_sz); + } am_hdr.data_sz = data_sz; am_hdr.rndv_hdr_sz = 0; #ifdef HAVE_DEBUGGER_SUPPORT - MPIDIG_REQUEST(sreq, datatype) = datatype; - MPIDIG_REQUEST(sreq, buffer) = (void *) buf; - MPIDIG_REQUEST(sreq, count) = count; + /* FIXME: partial data support in debugger */ + MPIDIG_REQUEST(sreq, datatype) = data->datatype; + MPIDIG_REQUEST(sreq, buffer) = data->buf; + MPIDIG_REQUEST(sreq, count) = data->count; #endif int is_local = MPIDI_av_is_local(addr); MPI_Aint am_hdr_sz = (MPI_Aint) sizeof(am_hdr); if (MPIDIG_check_eager(is_local, am_hdr_sz, data_sz, sreq)) { /* EAGER send */ - CH4_CALL(am_isend(rank, comm, MPIDIG_SEND, &am_hdr, am_hdr_sz, buf, count, datatype, - src_vci, dst_vci, sreq), is_local, mpi_errno); + CH4_CALL(am_isend(rank, comm, MPIDIG_SEND, &am_hdr, am_hdr_sz, + data->buf, data->count, data->datatype, src_vci, dst_vci, sreq), + is_local, mpi_errno); } else { /* RNDV send */ - MPIDIG_REQUEST(sreq, buffer) = (void *) buf; - MPIDIG_REQUEST(sreq, count) = count; - MPIDIG_REQUEST(sreq, datatype) = datatype; + MPIDIG_REQUEST(sreq, buffer) = data->buf; + MPIDIG_REQUEST(sreq, count) = data->count; + MPIDIG_REQUEST(sreq, datatype) = data->datatype; MPIDIG_REQUEST(sreq, u.send.dest) = rank; - MPIR_Datatype_add_ref_if_not_builtin(datatype); + MPIR_Datatype_add_ref_if_not_builtin(data->datatype); MPIDIG_AM_SEND_SET_RNDV(am_hdr.flags, MPIDIG_RNDV_GENERIC); CH4_CALL(am_send_hdr(rank, comm, MPIDIG_SEND, &am_hdr, am_hdr_sz, src_vci, dst_vci), @@ -115,10 +120,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDIG_isend_impl(const void *buf, MPI_Aint count, goto fn_exit; } -MPL_STATIC_INLINE_PREFIX int MPIDIG_mpi_isend(const void *buf, - MPI_Aint count, - MPI_Datatype datatype, - int rank, +MPL_STATIC_INLINE_PREFIX int MPIDIG_send_data(MPIR_Data * data, int rank, int tag, MPIR_Comm * comm, int context_offset, MPIDI_av_entry_t * addr, int src_vci, int dst_vci, MPIR_Request ** request, @@ -128,13 +130,34 @@ MPL_STATIC_INLINE_PREFIX int MPIDIG_mpi_isend(const void *buf, MPIR_FUNC_ENTER; uint8_t flags = syncflag ? MPIDIG_AM_SEND_FLAGS_SYNC : MPIDIG_AM_SEND_FLAGS_NONE; - mpi_errno = MPIDIG_isend_impl(buf, count, datatype, rank, tag, comm, context_offset, addr, + mpi_errno = MPIDIG_isend_impl(data, rank, tag, comm, context_offset, addr, flags, src_vci, dst_vci, request, errflag); MPIR_FUNC_EXIT; return mpi_errno; } +MPL_STATIC_INLINE_PREFIX int MPIDIG_mpi_isend(const void *buf, + MPI_Aint count, + MPI_Datatype datatype, + int rank, + int tag, MPIR_Comm * comm, int context_offset, + MPIDI_av_entry_t * addr, int src_vci, int dst_vci, + MPIR_Request ** request, + bool syncflag, MPIR_Errflag_t errflag) +{ + MPIR_Data data = { + .buf = (void *) buf, + .count = count, + .datatype = datatype, + .offset = 0, + .length = -1, + }; + + return MPIDIG_send_data(&data, rank, tag, comm, context_offset, addr, src_vci, dst_vci, + request, syncflag, errflag); +} + MPL_STATIC_INLINE_PREFIX int MPIDIG_mpi_cancel_send(MPIR_Request * sreq) { int mpi_errno = MPI_SUCCESS;