Skip to content

Commit

Permalink
Add timers to measure cost of intra-node aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
wkliao committed Nov 11, 2024
1 parent ae60975 commit 19c5a77
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 8 deletions.
11 changes: 7 additions & 4 deletions src/drivers/ncmpio/ncmpio_NC.h
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,9 @@ struct NC {
int my_aggr; /* rank ID of my aggregator */
int num_nonaggrs; /* number of non-aggregators assigned */
int *nonaggr_ranks; /* ranks of assigned non-aggregators */
#ifdef PNETCDF_PROFILING
double aggr_time;
#endif
};

#define NC_readonly(ncp) fIsSet((ncp)->flags, NC_MODE_RDONLY)
Expand Down Expand Up @@ -647,11 +650,11 @@ extern int
ncmpio_intra_node_aggr_init(NC *ncp);

extern int
ncmpio_intra_node_aggregation_nreqs(NC *ncp, int num_reqs, NC_req *put_list,
MPI_Offset newnumrecs);
ncmpio_intra_node_aggregation_nreqs(NC *ncp, int mode, int num_reqs,
NC_req *put_list, MPI_Offset newnumrecs);
extern int
ncmpio_intra_node_aggregation(NC *ncp, NC_var *varp, const MPI_Offset *start,
const MPI_Offset *count,
ncmpio_intra_node_aggregation(NC *ncp, int mode, NC_var *varp,
const MPI_Offset *start, const MPI_Offset *count,
const MPI_Offset *stride, MPI_Offset bufCount,
MPI_Datatype bufType, void *buf);

Expand Down
3 changes: 2 additions & 1 deletion src/drivers/ncmpio/ncmpio_getput.m4
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ err_check:
if (fIsSet(reqMode, NC_REQ_COLL) && ncp->my_aggr >= 0 && ncp->nprocs > 1) {
/* intra-node write aggregation must be in collective mode */
void *wbuf = (nbytes == 0) ? NULL : xbuf;
err = ncmpio_intra_node_aggregation(ncp, varp, start, count, stride, nelems, xtype, wbuf);
err = ncmpio_intra_node_aggregation(ncp, NC_REQ_WR, varp, start, count,
stride, nelems, xtype, wbuf);
if (status == NC_NOERR) status = err;
}
else {
Expand Down
72 changes: 70 additions & 2 deletions src/drivers/ncmpio/ncmpio_intra_node.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,29 @@
/*
* Copyright (C) 2024, Northwestern University and Argonne National Laboratory
* See COPYRIGHT notice in top-level directory.
*
* This file contains the implementation of intra-node aggregation feature,
* which is designed for the I/O patterns that contain many noncontiguous
* requests interleaved among processes, and spreading across a wide range of
* file space. It is particularly useful when the number of MPI processes
* allocated to a compute node is large.
*
* This feature is enabled by setting the PnetCDF hint 'nc_num_aggrs_per_node'
* to a positive integral value indicating the desired number of processes per
* compute node to be selected as the intra-node I/O aggregators. Each process
* is assigned a unique aggregator. The non-aggregators send their requests to
* the assigned aggregators, and then the aggregators make MPI-IO requests to
* the file.
*
* Such strategy can effectively reduce communication congestion due to many
* pending asynchronous messages produced in the collective write inside of
* MPI-IO.
*
* The concept of intra-node request aggregation is based on the paper:
* Q. Kang, S. Lee, K. Hou, R. Ross, A. Agrawal, A. Choudhary, and W. Liao.
* Improving MPI Collective I/O for High Volume Non-Contiguous Requests With
* Intra-Node Aggregation. IEEE Transactions on Parallel and Distributed
* Systems (TPDS), 31(11):2682-2695, November 2020.
*/

#ifdef HAVE_CONFIG_H
Expand Down Expand Up @@ -48,6 +71,11 @@
((*(b) < *(c)) ? (b) : ((*(a) < *(c)) ? (c) : (a))) : \
((*(b) > *(c)) ? (b) : ((*(a) < *(c)) ? (a) : (c))))

/*----< qsort_off_len_buf() >------------------------------------------------*/
/* Sort three arrays of offsets, lengths, and buffer addresses based on the
* increasing order of offsets. This code is based on the qsort routine from
* Bentley & McIlroy's "Engineering a Sort Function".
*/
static void
qsort_off_len_buf(MPI_Aint num,
#ifdef HAVE_MPI_LARGE_COUNT
Expand Down Expand Up @@ -125,7 +153,7 @@ qsort_off_len_buf(MPI_Aint num,
if ((r = pb - pa) > 1)
qsort_off_len_buf(r, offsets, lengths, bufAddr);
if ((r = pd - pc) > 1) {
/* Iterate rather than recurse to save stack space */
/* Iterate rather than recursively call self to save stack space */
lengths = lengths + (num - r);
bufAddr = bufAddr + (num - r);
offsets = pn - r;
Expand Down Expand Up @@ -161,10 +189,18 @@ ncmpio_intra_node_aggr_init(NC *ncp)
ncp->num_nonaggrs = 0; /* number of non-aggregators assigned */
ncp->nonaggr_ranks = NULL; /* ranks of assigned non-aggregators */

#ifdef PNETCDF_PROFILING
ncp->aggr_time = 0.0;
#endif

if (ncp->num_aggrs_per_node == 0 || ncp->num_aggrs_per_node == ncp->nprocs)
/* disable intra-node aggregation */
return NC_NOERR;

#ifdef PNETCDF_PROFILING
double timing = MPI_Wtime();
#endif

/* allocate space for storing the rank IDs of non-aggregators assigned to
* this rank. Note ncp->nonaggr_ranks[] will be freed when closing the
* file, if allocated.
Expand Down Expand Up @@ -368,6 +404,10 @@ ncmpio_intra_node_aggr_init(NC *ncp)
* of processes are allocated on the same node.
*/

#ifdef PNETCDF_PROFILING
ncp->aggr_time = MPI_Wtime() - timing;
#endif

return NC_NOERR;
}

Expand Down Expand Up @@ -791,7 +831,9 @@ intra_node_aggregation(NC *ncp,
MPI_Datatype recvTypes, fileType=MPI_BYTE;
MPI_File fh;
MPI_Request *req=NULL;

#ifdef PNETCDF_PROFILING
double timing = MPI_Wtime();
#endif
#ifdef HAVE_MPI_LARGE_COUNT
MPI_Count bufLen;
MPI_Type_size_c(bufType, &bufLen);
Expand Down Expand Up @@ -1190,6 +1232,10 @@ intra_node_aggregation(NC *ncp,
NCI_Free(lengths);
}

#ifdef PNETCDF_PROFILING
ncp->aggr_time += MPI_Wtime() - timing;
#endif

if (ncp->rank != ncp->my_aggr) /* non-aggregator writes nothing */
buf_count = 0;

Expand Down Expand Up @@ -1221,6 +1267,7 @@ intra_node_aggregation(NC *ncp,
/* This is a collective call */
int
ncmpio_intra_node_aggregation_nreqs(NC *ncp,
int reqMode,
int num_reqs,
NC_req *put_list,
MPI_Offset newnumrecs)
Expand All @@ -1234,6 +1281,12 @@ ncmpio_intra_node_aggregation_nreqs(NC *ncp,
int *lengths=NULL;
#endif
MPI_Datatype bufType=MPI_BYTE;
#ifdef PNETCDF_PROFILING
double timing = MPI_Wtime();
#endif

/* currently supports write requests only */
if (fIsSet(reqMode, NC_REQ_RD)) return NC_NOERR;

assert(ncp->my_aggr >= 0);

Expand All @@ -1260,6 +1313,10 @@ ncmpio_intra_node_aggregation_nreqs(NC *ncp,
if (put_list != NULL)
NCI_Free(put_list);

#ifdef PNETCDF_PROFILING
ncp->aggr_time += MPI_Wtime() - timing;
#endif

err = intra_node_aggregation(ncp, num_pairs, offsets, lengths, bufLen,
bufType, NULL);
if (status == NC_NOERR) status = err;
Expand Down Expand Up @@ -1291,6 +1348,7 @@ ncmpio_intra_node_aggregation_nreqs(NC *ncp,
/* This is a collective call */
int
ncmpio_intra_node_aggregation(NC *ncp,
int reqMode,
NC_var *varp,
const MPI_Offset *start,
const MPI_Offset *count,
Expand All @@ -1307,6 +1365,12 @@ ncmpio_intra_node_aggregation(NC *ncp,
MPI_Aint *offsets=NULL;
int *lengths=NULL;
#endif
#ifdef PNETCDF_PROFILING
double timing = MPI_Wtime();
#endif

/* currently supports write requests only */
if (fIsSet(reqMode, NC_REQ_RD)) return NC_NOERR;

if (buf == NULL) /* zero-length request */
return intra_node_aggregation(ncp, 0, NULL, NULL, 0, MPI_BYTE, NULL);
Expand All @@ -1326,6 +1390,10 @@ ncmpio_intra_node_aggregation(NC *ncp,
}
status = err;

#ifdef PNETCDF_PROFILING
ncp->aggr_time += MPI_Wtime() - timing;
#endif

err = intra_node_aggregation(ncp, num_pairs, offsets, lengths, bufCount,
bufType, buf);
if (status == NC_NOERR) status = err;
Expand Down
4 changes: 3 additions & 1 deletion src/drivers/ncmpio/ncmpio_wait.c
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,9 @@ req_commit(NC *ncp,

if (ncp->my_aggr >= 0 && coll_indep == NC_REQ_COLL && ncp->nprocs > 1)
/* intra-node write aggregation must be in collective mode */
err = ncmpio_intra_node_aggregation_nreqs(ncp, num_w_reqs, put_list, newnumrecs);
err = ncmpio_intra_node_aggregation_nreqs(ncp, NC_REQ_WR,
num_w_reqs, put_list,
newnumrecs);
else
err = wait_getput(ncp, num_w_reqs, put_list, NC_REQ_WR, coll_indep,
newnumrecs);
Expand Down

0 comments on commit 19c5a77

Please sign in to comment.