From 546988856e4f1c3d55d3308da3581dfa86fb08fa Mon Sep 17 00:00:00 2001 From: Ivan Raikov Date: Tue, 7 Jan 2025 04:56:33 -0800 Subject: [PATCH 1/4] WIP: - introducing chunked all-to-all collective communication in order to work around 2GB MPI limit; - some refactoring of read_projection_datasets in order to avoid haphazard increments of end ranges; --- include/cell/cell_attributes.hh | 14 +- include/data/chunk_info.hh | 69 ++++++++ include/data/serialize_cell_attributes.hh | 10 +- include/data/serialize_edge.hh | 10 +- include/data/serialize_tree.hh | 12 +- include/graph/node_attributes.hh | 5 +- include/mpi/allgatherv_template.hh | 148 ++++++++++++++++++ include/mpi/alltoallv_template.hh | 109 ++++++++----- python/neuroh5/iomodule.cc | 6 +- src/cell/append_tree.cc | 4 +- src/cell/cell_attributes.cc | 38 +---- src/cell/scatter_read_tree.cc | 6 +- src/data/append_rank_edge_map.cc | 24 ++- src/data/serialize_cell_attributes.cc | 28 ++-- src/data/serialize_edge.cc | 26 +-- src/data/serialize_tree.cc | 38 ++--- src/graph/append_graph.cc | 36 +---- src/graph/node_attributes.cc | 18 +-- src/graph/scatter_read_projection.cc | 19 ++- .../scatter_read_projection_selection.cc | 6 +- src/graph/write_graph.cc | 40 ++--- src/hdf5/read_projection_datasets.cc | 148 +++++++++++++++--- 22 files changed, 548 insertions(+), 266 deletions(-) create mode 100644 include/data/chunk_info.hh create mode 100644 include/mpi/allgatherv_template.hh diff --git a/include/cell/cell_attributes.hh b/include/cell/cell_attributes.hh index 2456b3e..9e56f2a 100644 --- a/include/cell/cell_attributes.hh +++ b/include/cell/cell_attributes.hh @@ -466,7 +466,7 @@ namespace neuroh5 vector gid_recvbuf; { - vector idx_sendcounts(size, 0), idx_sdispls(size, 0), idx_recvcounts(size, 0), idx_rdispls(size, 0); + vector idx_sendcounts(size, 0), idx_sdispls(size, 0), idx_recvcounts(size, 0), idx_rdispls(size, 0); idx_sendcounts[io_dests[rank]] = local_index_vector.size(); throw_assert(mpi::alltoallv_vector(comm, MPI_CELL_IDX_T, @@ -478,7 +478,7 @@ namespace neuroh5 vector attr_ptr; { vector attr_size_recvbuf; - vector attr_size_sendcounts(size, 0), attr_size_sdispls(size, 0), attr_size_recvcounts(size, 0), attr_size_rdispls(size, 0); + vector attr_size_sendcounts(size, 0), attr_size_sdispls(size, 0), attr_size_recvcounts(size, 0), attr_size_rdispls(size, 0); attr_size_sendcounts[io_dests[rank]] = local_attr_size_vector.size(); throw_assert(mpi::alltoallv_vector(comm, MPI_ATTR_PTR_T, @@ -522,7 +522,7 @@ namespace neuroh5 local_value_vector.insert(local_value_vector.end(),v.begin(),v.end()); } - vector value_sendcounts(size, 0), value_sdispls(size, 0), value_recvcounts(size, 0), value_rdispls(size, 0); + vector value_sendcounts(size, 0), value_sdispls(size, 0), value_recvcounts(size, 0), value_rdispls(size, 0); value_sendcounts[io_dests[rank]] = local_value_size; @@ -902,7 +902,7 @@ namespace neuroh5 vector gid_recvbuf; { - vector idx_sendcounts(size, 0), idx_sdispls(size, 0), idx_recvcounts(size, 0), idx_rdispls(size, 0); + vector idx_sendcounts(size, 0), idx_sdispls(size, 0), idx_recvcounts(size, 0), idx_rdispls(size, 0); idx_sendcounts[io_dests[rank]] = local_index_vector.size(); throw_assert(mpi::alltoallv_vector(comm, MPI_CELL_IDX_T, @@ -914,7 +914,7 @@ namespace neuroh5 vector attr_ptr; { vector attr_size_recvbuf; - vector attr_size_sendcounts(size, 0), attr_size_sdispls(size, 0), attr_size_recvcounts(size, 0), attr_size_rdispls(size, 0); + vector attr_size_sendcounts(size, 0), attr_size_sdispls(size, 0), attr_size_recvcounts(size, 0), attr_size_rdispls(size, 0); attr_size_sendcounts[io_dests[rank]] = local_attr_size_vector.size(); throw_assert(mpi::alltoallv_vector(comm, MPI_ATTR_PTR_T, @@ -925,7 +925,7 @@ namespace neuroh5 if ((is_io_rank) && (attr_size_recvbuf.size() > 0)) { ATTR_PTR_T attr_ptr_offset = 0; - for (size_t s=0; s value_sendcounts(size, 0), value_sdispls(size, 0), value_recvcounts(size, 0), value_rdispls(size, 0); + vector value_sendcounts(size, 0), value_sdispls(size, 0), value_recvcounts(size, 0), value_rdispls(size, 0); value_sendcounts[io_dests[rank]] = local_value_size; diff --git a/include/data/chunk_info.hh b/include/data/chunk_info.hh new file mode 100644 index 0000000..1675d94 --- /dev/null +++ b/include/data/chunk_info.hh @@ -0,0 +1,69 @@ +#ifndef CHUNK_INFO_HH +#define CHUNK_INFO_HH + +#include +#include + +using namespace std; + +namespace neuroh5 +{ + namespace data + { + + // Constants for chunking + constexpr size_t CHUNK_SIZE = 1ULL << 30; // 1GB chunks for safety margin + + template + struct ChunkInfo { + std::vector sendcounts; + std::vector sdispls; + std::vector recvcounts; + std::vector rdispls; + size_t total_send_size; + size_t total_recv_size; + }; + + + template + ChunkInfo calculate_chunk_sizes( + const std::vector& full_sendcounts, + const std::vector& full_sdispls, + const std::vector& full_recvcounts, + const std::vector& full_rdispls, + size_t chunk_start, + size_t chunk_size) + { + const size_t size = full_sendcounts.size(); + ChunkInfo chunk; + chunk.sendcounts.resize(size); + chunk.sdispls.resize(size); + chunk.recvcounts.resize(size); + chunk.rdispls.resize(size); + + chunk.total_send_size = 0; + chunk.total_recv_size = 0; + + for (size_t i = 0; i < size; ++i) { + // Calculate how much data to send in this chunk + size_t send_remaining = (chunk_start < full_sendcounts[i]) ? + full_sendcounts[i] - chunk_start : 0; + chunk.sendcounts[i] = static_cast(std::min(send_remaining, chunk_size)); + chunk.sdispls[i] = static_cast(full_sdispls[i] + chunk_start); + chunk.total_send_size += chunk.sendcounts[i]; + + // Calculate how much data to receive in this chunk + size_t recv_remaining = (chunk_start < full_recvcounts[i]) ? + full_recvcounts[i] - chunk_start : 0; + chunk.recvcounts[i] = static_cast(std::min(recv_remaining, chunk_size)); + chunk.rdispls[i] = static_cast(full_rdispls[i] + chunk_start); + chunk.total_recv_size += chunk.recvcounts[i]; + } + + return chunk; + } + + + } +} +#endif diff --git a/include/data/serialize_cell_attributes.hh b/include/data/serialize_cell_attributes.hh index 0cc6ef2..69787f7 100644 --- a/include/data/serialize_cell_attributes.hh +++ b/include/data/serialize_cell_attributes.hh @@ -4,7 +4,7 @@ /// /// Functions for serializing cell attributes. /// -/// Copyright (C) 2017-2020 Project Neuroh5. +/// Copyright (C) 2017-2024 Project Neuroh5. //============================================================================== #ifndef SERIALIZE_CELL_ATTRIBUTES_HH @@ -27,14 +27,14 @@ namespace neuroh5 void serialize_rank_attr_map (const size_t num_ranks, const size_t start_rank, const map & rank_attr_map, - std::vector& sendcounts, + std::vector& sendcounts, std::vector &sendbuf, - std::vector &sdispls); + std::vector &sdispls); void deserialize_rank_attr_map (const size_t num_ranks, const std::vector &recvbuf, - const std::vector& recvcounts, - const std::vector& rdispls, + const std::vector& recvcounts, + const std::vector& rdispls, AttrMap& all_attr_map); diff --git a/include/data/serialize_edge.hh b/include/data/serialize_edge.hh index 1ae962c..de06a58 100644 --- a/include/data/serialize_edge.hh +++ b/include/data/serialize_edge.hh @@ -4,7 +4,7 @@ /// /// Functions for serializing edge data. /// -/// Copyright (C) 2017 Project Neuroh5. +/// Copyright (C) 2017-2024 Project Neuroh5. //============================================================================== #ifndef SERIALIZE_EDGE_HH @@ -31,14 +31,14 @@ namespace neuroh5 const size_t start_rank, const rank_edge_map_t& prj_rank_edge_map, size_t &num_packed_edges, - vector& sendcounts, + vector& sendcounts, vector &sendbuf, - vector &sdispls); + vector &sdispls); void deserialize_rank_edge_map (const size_t num_ranks, const vector &recvbuf, - const vector& recvcounts, - const vector& rdispls, + const vector& recvcounts, + const vector& rdispls, edge_map_t& prj_edge_map, size_t& num_unpacked_nodes, size_t& num_unpacked_edges diff --git a/include/data/serialize_tree.hh b/include/data/serialize_tree.hh index 307d5e4..e891885 100644 --- a/include/data/serialize_tree.hh +++ b/include/data/serialize_tree.hh @@ -26,21 +26,21 @@ namespace neuroh5 void serialize_rank_tree_map (const size_t num_ranks, const size_t start_rank, const std::map >& rank_tree_map, - std::vector& sendcounts, + std::vector& sendcounts, std::vector &sendbuf, - std::vector &sdispls); + std::vector &sdispls); void deserialize_rank_tree_map (const size_t num_ranks, const std::vector &recvbuf, - const std::vector& recvcounts, - const std::vector& rdispls, + const std::vector& recvcounts, + const std::vector& rdispls, std::map &all_tree_map ); void deserialize_rank_tree_list (const size_t num_ranks, const vector &recvbuf, - const vector& recvcounts, - const vector& rdispls, + const vector& recvcounts, + const vector& rdispls, forward_list &all_tree_list); } } diff --git a/include/graph/node_attributes.hh b/include/graph/node_attributes.hh index 805b90e..b470f36 100644 --- a/include/graph/node_attributes.hh +++ b/include/graph/node_attributes.hh @@ -213,6 +213,9 @@ namespace neuroh5 value_offset = value_offset + v.size(); } //attr_ptr.push_back(value_offset); + + T dummy; + MPI_Datatype mpi_type = infer_mpi_datatype(dummy); vector value_sendcounts(size, 0), value_sdispls(size, 0), value_recvcounts(size, 0), value_rdispls(size, 0); value_sendcounts[io_dests[rank]] = local_value_size; @@ -241,8 +244,6 @@ namespace neuroh5 //assert(recvbuf_size > 0); vector value_recvbuf(value_recvbuf_size); - T dummy; - MPI_Datatype mpi_type = infer_mpi_datatype(dummy); throw_assert(MPI_Alltoallv(&value_vector[0], &value_sendcounts[0], &value_sdispls[0], mpi_type, &value_recvbuf[0], &value_recvcounts[0], &value_rdispls[0], mpi_type, comm) == MPI_SUCCESS, "error in MPI_Alltoallv"); diff --git a/include/mpi/allgatherv_template.hh b/include/mpi/allgatherv_template.hh new file mode 100644 index 0000000..d201fb8 --- /dev/null +++ b/include/mpi/allgatherv_template.hh @@ -0,0 +1,148 @@ +// -*- mode: c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- +//============================================================================== +/// @file allgatherv_template.hh +/// +/// Function for sending data via MPI Allgatherv. +/// +/// Copyright (C) 2017-2024 Project NeuroH5. +//============================================================================== + +#ifndef ALLGATHERV_TEMPLATE_HH +#define ALLGATHERV_TEMPLATE_HH + +#include + +#include +#include + +#include "mpi_debug.hh" +#include "throw_assert.hh" +#include "neuroh5_types.hh" +#include "attr_map.hh" +#include "chunk_info.hh" + +using namespace std; + + +namespace neuroh5 +{ + + namespace mpi + { + + template + int allgatherv_vector (MPI_Comm comm, + const MPI_Datatype datatype, + size_t sendcount, + const vector& sendbuf, + vector& recvcounts, + vector& rdispls, + vector& recvbuf) + { + int ssize; size_t size; + throw_assert(MPI_Comm_size(comm, &ssize) == MPI_SUCCESS, + "allgatherv: unable to obtain size of MPI communicator"); + throw_assert_nomsg(ssize > 0); + size = ssize; + + // Exchange counts + + /*************************************************************************** + * Send MPI data with Allgatherv + **************************************************************************/ + recvcounts.resize(size,0); + rdispls.resize(size,0); + + // 1. Each ALL_COMM rank sends a data size to every other rank and + // creates sendcounts and sdispls arrays + + { + int status; + MPI_Request request; + + status = MPI_Iallgather(&sendcount, 1, MPI_SIZE_T, + &recvcounts[0], 1, MPI_SIZE_T, comm, + &request); + throw_assert(status == MPI_SUCCESS, + "allgatherv: error in MPI_Iallgather: status: " << status); + status = MPI_Wait(&request, MPI_STATUS_IGNORE); + throw_assert(status == MPI_SUCCESS, + "allgatherv: error in MPI_Wait: status: " << status); + + } + + // 2. Each rank accumulates the vector sizes and allocates + // a receive buffer, recvcounts, and rdispls + rdispls[0] = 0; + size_t recvbuf_size = recvcounts[0]; + for (size_t p = 1; p < size; ++p) + { + rdispls[p] = rdispls[p-1] + recvcounts[p-1]; + recvbuf_size += recvcounts[p]; + } + + //assert(recvbuf_size > 0); + recvbuf.resize(recvbuf_size, 0); + + { + int status; + MPI_Request request; + + // 3. Perform the actual data exchange in chunks + size_t chunk_start = 0; + while (true) + { + size_t global_remaining = 0; + size_t remaining = 0; + if (sendcount > chunk_start) + { + remaining = sendcount - chunk_start; + } + + status = MPI_Iallreduce(&remaining, + &global_remaining, + 1, MPI_SIZE_T, MPI_SUM, + comm, &request); + + throw_assert (status == MPI_SUCCESS, "error in MPI_Iallreduce: status = " << status); + status = MPI_Wait(&request, MPI_STATUS_IGNORE); + throw_assert(status == MPI_SUCCESS, + "allgatherv: error in MPI_Wait: status: " << status); + + if (global_remaining == 0) + break; + + size_t current_chunk = (chunk_start < sendcount) ? std::min(data::CHUNK_SIZE, sendcount - chunk_start) : 0; + + std::vector chunk_recvcounts(size); + std::vector chunk_displs(size); + + for (rank_t i = 0; i < size; ++i) { + chunk_recvcounts[i] = static_cast( + ((chunk_start < recvcounts[i]) ? (recvcounts[i] - chunk_start) : 0)); + chunk_displs[i] = static_cast(rdispls[i] + chunk_start); + } + + + status = MPI_Iallgatherv(&sendbuf[chunk_start], + static_cast(current_chunk), + datatype, + &recvbuf[0], + &chunk_recvcounts[0], + &chunk_displs[0], + datatype, + comm, &request); + throw_assert (status == MPI_SUCCESS, "error in MPI_Alltoallv: status = " << status); + status = MPI_Wait(&request, MPI_STATUS_IGNORE); + throw_assert(status == MPI_SUCCESS, + "allgatherv: error in MPI_Wait: status: " << status); + chunk_start = chunk_start + current_chunk; + } + } + + return MPI_SUCCESS; + } + } +} + +#endif diff --git a/include/mpi/alltoallv_template.hh b/include/mpi/alltoallv_template.hh index 783ffa8..5b4d7bc 100644 --- a/include/mpi/alltoallv_template.hh +++ b/include/mpi/alltoallv_template.hh @@ -4,7 +4,7 @@ /// /// Function for sending data via MPI Alltoallv. /// -/// Copyright (C) 2017-2022 Project NeuroH5. +/// Copyright (C) 2017-2024 Project NeuroH5. //============================================================================== #ifndef ALLTOALLV_TEMPLATE_HH @@ -19,6 +19,7 @@ #include "throw_assert.hh" #include "neuroh5_types.hh" #include "attr_map.hh" +#include "chunk_info.hh" using namespace std; @@ -28,25 +29,21 @@ namespace neuroh5 namespace mpi { - + template int alltoallv_vector (MPI_Comm comm, const MPI_Datatype datatype, - const vector& sendcounts, - const vector& sdispls, + const vector& sendcounts, + const vector& sdispls, const vector& sendbuf, - vector& recvcounts, - vector& rdispls, + vector& recvcounts, + vector& rdispls, vector& recvbuf) { - int srank, ssize; size_t rank, size; + int ssize; size_t size; throw_assert(MPI_Comm_size(comm, &ssize) == MPI_SUCCESS, "alltoallv: unable to obtain size of MPI communicator"); - throw_assert(MPI_Comm_rank(comm, &srank) == MPI_SUCCESS, - "alltoallv: unable to obtain rank of MPI communicator"); - throw_assert_nomsg(srank >= 0); throw_assert_nomsg(ssize > 0); - rank = srank; size = ssize; @@ -62,17 +59,15 @@ namespace neuroh5 { int status; MPI_Request request; - status = MPI_Ialltoall(&sendcounts[0], 1, MPI_INT, - &recvcounts[0], 1, MPI_INT, comm, &request); + status = MPI_Ialltoall(&sendcounts[0], 1, MPI_SIZE_T, + &recvcounts[0], 1, MPI_SIZE_T, + comm, &request); throw_assert(status == MPI_SUCCESS, - "alltoallv: error in MPI_Alltoallv: status: " << status); - + "alltoallv: error in MPI_Ialltoall: status: " << status); status = MPI_Wait(&request, MPI_STATUS_IGNORE); throw_assert(status == MPI_SUCCESS, "alltoallv: error in MPI_Wait: status: " << status); - //throw_assert(MPI_Barrier(comm) == MPI_SUCCESS, - // "alltoallv: error in MPI_Barrier"); } // 2. Each rank accumulates the vector sizes and allocates @@ -88,37 +83,65 @@ namespace neuroh5 //assert(recvbuf_size > 0); recvbuf.resize(recvbuf_size, 0); - size_t global_recvbuf_size=0; { int status; MPI_Request request; - status = MPI_Iallreduce(&recvbuf_size, &global_recvbuf_size, 1, MPI_SIZE_T, MPI_SUM, - comm, &request); - throw_assert (status == MPI_SUCCESS, "error in MPI_Iallreduce: status = " << status); - status = MPI_Wait(&request, MPI_STATUS_IGNORE); - throw_assert(status == MPI_SUCCESS, - "alltoallv: error in MPI_Wait: status: " << status); - //throw_assert(MPI_Barrier(comm) == MPI_SUCCESS, - // "alltoallv: error in MPI_Barrier"); + + // 3. Perform the actual data exchange in chunks + size_t chunk_start = 0; + while (true) + { + size_t global_send_size=0; + size_t global_recv_size=0; + + auto chunk = data::calculate_chunk_sizes( + sendcounts, sdispls, recvcounts, rdispls, + chunk_start, data::CHUNK_SIZE); + + status = MPI_Iallreduce(&chunk.total_recv_size, + &global_recv_size, + 1, MPI_SIZE_T, MPI_SUM, + comm, &request); + + throw_assert (status == MPI_SUCCESS, "error in MPI_Iallreduce: status = " << status); + status = MPI_Wait(&request, MPI_STATUS_IGNORE); + throw_assert(status == MPI_SUCCESS, + "alltoallv: error in MPI_Wait: status: " << status); + + status = MPI_Iallreduce(&chunk.total_send_size, + &global_send_size, + 1, MPI_SIZE_T, MPI_SUM, + comm, &request); + + throw_assert (status == MPI_SUCCESS, "error in MPI_Iallreduce: status = " << status); + status = MPI_Wait(&request, MPI_STATUS_IGNORE); + throw_assert(status == MPI_SUCCESS, + "alltoallv: error in MPI_Wait: status: " << status); + + if (global_send_size == 0 && + global_recv_size == 0) + break; + + status = MPI_Ialltoallv(&sendbuf[0], + &chunk.sendcounts[0], + &chunk.sdispls[0], + datatype, + &recvbuf[0], + &chunk.recvcounts[0], + &chunk.rdispls[0], + datatype, + comm, &request); + throw_assert (status == MPI_SUCCESS, "error in MPI_Alltoallv: status = " << status); + status = MPI_Wait(&request, MPI_STATUS_IGNORE); + throw_assert(status == MPI_SUCCESS, + "alltoallv: error in MPI_Wait: status: " << status); + + chunk_start += data::CHUNK_SIZE; + + } } - if (global_recvbuf_size > 0) - { - int status; - MPI_Request request; - - // 3. Each ALL_COMM rank participates in the MPI_Alltoallv - status = MPI_Ialltoallv(&sendbuf[0], &sendcounts[0], &sdispls[0], datatype, - &recvbuf[0], &recvcounts[0], &rdispls[0], datatype, - comm, &request); - throw_assert (status == MPI_SUCCESS, "error in MPI_Alltoallv: status = " << status); - status = MPI_Wait(&request, MPI_STATUS_IGNORE); - throw_assert(status == MPI_SUCCESS, - "alltoallv: error in MPI_Wait: status: " << status); - //throw_assert(MPI_Barrier(comm) == MPI_SUCCESS, - // "alltoallv: error in MPI_Barrier"); - } - return 0; + return MPI_SUCCESS; } } } diff --git a/python/neuroh5/iomodule.cc b/python/neuroh5/iomodule.cc index 06764d3..45e282f 100644 --- a/python/neuroh5/iomodule.cc +++ b/python/neuroh5/iomodule.cc @@ -2769,6 +2769,7 @@ static void NeuroH5EdgeIter_dealloc(PyNeuroH5EdgeIterState *py_state) PyObject* NeuroH5EdgeIter_iternext(PyObject *self) { PyNeuroH5EdgeIterState *py_state = (PyNeuroH5EdgeIterState *)self; + if (py_state->state->it_edge != py_state->state->edge_map.cend()) { const NODE_IDX_T key = py_state->state->it_edge->first; @@ -2841,6 +2842,7 @@ NeuroH5EdgeIter_FromMap(const edge_map_t& prj_edge_map, return NULL; } + p->state = new NeuroH5EdgeIterState(); p->state->seq_index = 0; @@ -2848,7 +2850,7 @@ NeuroH5EdgeIter_FromMap(const edge_map_t& prj_edge_map, p->state->edge_map = std::move(prj_edge_map); p->state->edge_attr_name_spaces = edge_attr_name_spaces; p->state->it_edge = p->state->edge_map.cbegin(); - + return (PyObject *)p; } @@ -2933,7 +2935,7 @@ extern "C" for (size_t i = 0; i < prj_vector.size(); i++) { - edge_map_t prj_edge_map = prj_vector[i]; + edge_map_t prj_edge_map = std::move(prj_vector[i]); PyObject *py_edge_iter = NeuroH5EdgeIter_FromMap(prj_edge_map, edge_attr_name_spaces); diff --git a/src/cell/append_tree.cc b/src/cell/append_tree.cc index 4511ec8..0b43375 100644 --- a/src/cell/append_tree.cc +++ b/src/cell/append_tree.cc @@ -332,8 +332,8 @@ namespace neuroh5 std::forward_list local_tree_list; { std::vector sendbuf; - std::vector sendcounts, sdispls; - std::vector recvcounts, rdispls; + std::vector sendcounts, sdispls; + std::vector recvcounts, rdispls; std::vector recvbuf; rank_t dst_rank = io_dests[rank]; diff --git a/src/cell/cell_attributes.cc b/src/cell/cell_attributes.cc index 04422ee..dfc8a92 100644 --- a/src/cell/cell_attributes.cc +++ b/src/cell/cell_attributes.cc @@ -964,7 +964,7 @@ namespace neuroh5 throw_assert_nomsg(io_size > 0); vector sendbuf; - vector sendcounts(size,0), sdispls(size,0), recvcounts(size,0), rdispls(size,0); + vector sendcounts(size,0), sdispls(size,0), recvcounts(size,0), rdispls(size,0); set io_rank_set; data::range_sample(size, io_size, io_rank_set); @@ -1059,31 +1059,13 @@ namespace neuroh5 attr_map.insert_name(attr_names[data::AttrMap::attr_index_int32][i]); } - // 6. Each ALL_COMM rank sends an attribute set size to - // every other ALL_COMM rank (non IO_COMM ranks pass zero) - - throw_assert_nomsg(MPI_Alltoall(&sendcounts[0], 1, MPI_INT, - &recvcounts[0], 1, MPI_INT, all_comm) >= 0); - // 7. Each ALL_COMM rank accumulates the vector sizes and allocates // a receive buffer, recvcounts, and rdispls - size_t recvbuf_size; vector recvbuf; - recvbuf_size = recvcounts[0]; - for (int p = 1; p < ssize; ++p) - { - rdispls[p] = rdispls[p-1] + recvcounts[p-1]; - recvbuf_size += recvcounts[p]; - } - if (recvbuf_size > 0) - recvbuf.resize(recvbuf_size); - // 8. Each ALL_COMM rank participates in the MPI_Alltoallv throw_assert_nomsg(mpi::alltoallv_vector(all_comm, MPI_CHAR, sendcounts, sdispls, sendbuf, recvcounts, rdispls, recvbuf) >= 0); - - sendbuf.clear(); sendbuf.shrink_to_fit(); @@ -1616,7 +1598,7 @@ namespace neuroh5 } - vector sendcounts(size,0), sdispls(size,0), recvcounts(size,0), rdispls(size,0); + vector sendcounts(size,0), sdispls(size,0), recvcounts(size,0), rdispls(size,0); vector sendbuf; vector< size_t > num_attrs; @@ -1752,25 +1734,11 @@ namespace neuroh5 attr_values.insert_name(attr_names[data::AttrMap::attr_index_int32][i]); } - // 6. Each rank sends an attribute set size to - // every other rank (non IO_COMM ranks pass zero) - throw_assert_nomsg(MPI_Alltoall(&sendcounts[0], 1, MPI_INT, - &recvcounts[0], 1, MPI_INT, comm) == MPI_SUCCESS); // 7. Each COMM rank accumulates the vector sizes and allocates // a receive buffer, recvcounts, and rdispls - size_t recvbuf_size; vector recvbuf; - - recvbuf_size = recvcounts[0]; - for (rank_t p = 1; p < size; ++p) - { - rdispls[p] = rdispls[p-1] + recvcounts[p-1]; - recvbuf_size += recvcounts[p]; - } - if (recvbuf_size > 0) - recvbuf.resize(recvbuf_size); - + // 8. Each COMM rank participates in the MPI_Alltoallv throw_assert_nomsg(mpi::alltoallv_vector(comm, MPI_CHAR, sendcounts, sdispls, sendbuf, recvcounts, rdispls, recvbuf) == MPI_SUCCESS); diff --git a/src/cell/scatter_read_tree.cc b/src/cell/scatter_read_tree.cc index 0c54ec5..4fe0c58 100644 --- a/src/cell/scatter_read_tree.cc +++ b/src/cell/scatter_read_tree.cc @@ -4,7 +4,7 @@ /// /// Read and scatter tree structures. /// -/// Copyright (C) 2016-2021 Project NeuroH5. +/// Copyright (C) 2016-2024 Project NeuroH5. //============================================================================== #include @@ -59,7 +59,7 @@ namespace neuroh5 ) { std::vector sendbuf; - std::vector sendcounts, sdispls; + std::vector sendcounts, sdispls; MPI_Comm all_comm; // MPI Communicator for I/O ranks @@ -131,7 +131,7 @@ namespace neuroh5 throw_assert_nomsg(MPI_Comm_free(&io_comm) == MPI_SUCCESS); { - vector recvcounts, rdispls; + vector recvcounts, rdispls; vector recvbuf; throw_assert_nomsg(mpi::alltoallv_vector(all_comm, MPI_CHAR, sendcounts, sdispls, sendbuf, diff --git a/src/data/append_rank_edge_map.cc b/src/data/append_rank_edge_map.cc index 9d522e7..8e510a7 100644 --- a/src/data/append_rank_edge_map.cc +++ b/src/data/append_rank_edge_map.cc @@ -4,13 +4,14 @@ /// /// Populates a mapping between ranks and edge values. /// -/// Copyright (C) 2016-2020 Project NeuroH5. +/// Copyright (C) 2016-2024 Project NeuroH5. //============================================================================== #include #include #include +#include "debug.hh" #include "neuroh5_types.hh" #include "attr_val.hh" #include "rank_range.hh" @@ -45,23 +46,32 @@ namespace neuroh5 ) { int ierr = 0; size_t dst_ptr_size; + NODE_IDX_T dst_base = 0; if (dst_blk_ptr.size() > 0) { dst_ptr_size = dst_ptr.size(); size_t num_dst = 0; - for (size_t b = 0; b < dst_blk_ptr.size()-1; ++b) + for (size_t b = 0; b < dst_blk_ptr.size(); ++b) { - size_t low_dst_ptr = dst_blk_ptr[b], + size_t low_dst_ptr = dst_blk_ptr[b], high_dst_ptr = 0; + if (b < dst_blk_ptr.size()-1) high_dst_ptr = dst_blk_ptr[b+1]; - - NODE_IDX_T dst_base = dst_idx[b]; + else + high_dst_ptr = dst_ptr_size; + + if (b < dst_idx.size()) + dst_base = dst_idx[b]; for (size_t i = low_dst_ptr, ii = 0; i < high_dst_ptr; ++i, ++ii) { - if (i < dst_ptr_size-1) { NODE_IDX_T dst = dst_base + ii + dst_start; - size_t low = dst_ptr[i], high = dst_ptr[i+1]; + size_t low = dst_ptr[i], high = 0; + if (i < dst_ptr_size-1) + high = dst_ptr[i+1]; + else + high = src_idx.size(); + if (high > low) { switch (edge_map_type) diff --git a/src/data/serialize_cell_attributes.cc b/src/data/serialize_cell_attributes.cc index 5cb939c..7a94709 100644 --- a/src/data/serialize_cell_attributes.cc +++ b/src/data/serialize_cell_attributes.cc @@ -38,32 +38,32 @@ namespace neuroh5 void serialize_rank_attr_map (const size_t num_ranks, const size_t start_rank, const map & rank_attr_map, - vector& sendcounts, + vector& sendcounts, vector &sendbuf, - vector &sdispls) + vector &sdispls) { - vector rank_sequence; + vector rank_sequence; sdispls.resize(num_ranks); sendcounts.resize(num_ranks); - size_t end_rank = num_ranks; + rank_t end_rank = num_ranks; throw_assert(start_rank < end_rank, "serialize_rank_attr_map: invalid start rank"); // Recommended all-to-all communication pattern: start at the current rank, then wrap around; // (as opposed to starting at rank 0) - for (int key_rank = start_rank; key_rank < end_rank; key_rank++) + for (rank_t key_rank = start_rank; key_rank < end_rank; key_rank++) { rank_sequence.push_back(key_rank); } - for (int key_rank = 0; key_rank < (int)start_rank; key_rank++) + for (rank_t key_rank = 0; key_rank < start_rank; key_rank++) { rank_sequence.push_back(key_rank); } size_t sendpos = 0; std::stringstream ss(ios::in | ios::out | ios::binary); - for (const int& key_rank : rank_sequence) + for (const rank_t& key_rank : rank_sequence) { sdispls[key_rank] = sendpos; @@ -94,19 +94,19 @@ namespace neuroh5 void deserialize_rank_attr_map (const size_t num_ranks, const vector &recvbuf, - const vector& recvcounts, - const vector& rdispls, + const vector& recvcounts, + const vector& rdispls, AttrMap& all_attr_map) { - const int recvbuf_size = recvbuf.size(); + const size_t recvbuf_size = recvbuf.size(); - for (size_t ridx = 0; ridx < num_ranks; ridx++) + for (rank_t ridx = 0; ridx < num_ranks; ridx++) { if (recvcounts[ridx] > 0) { - int recvsize = recvcounts[ridx]; - int recvpos = rdispls[ridx]; - int startpos = recvpos; + size_t recvsize = recvcounts[ridx]; + size_t recvpos = rdispls[ridx]; + size_t startpos = recvpos; AttrMap attr_map; throw_assert(recvpos < recvbuf_size, diff --git a/src/data/serialize_edge.cc b/src/data/serialize_edge.cc index a10b2cb..fab9c62 100644 --- a/src/data/serialize_edge.cc +++ b/src/data/serialize_edge.cc @@ -4,7 +4,7 @@ /// /// Top-level functions for serializing/deserializing graphs edges. /// -/// Copyright (C) 2016-2022 Project NeuroH5. +/// Copyright (C) 2016-2024 Project NeuroH5. //============================================================================== #include "debug.hh" @@ -39,11 +39,11 @@ namespace neuroh5 const size_t start_rank, const rank_edge_map_t& prj_rank_edge_map, size_t &num_packed_edges, - vector& sendcounts, + vector& sendcounts, vector &sendbuf, - vector &sdispls) + vector &sdispls) { - vector rank_sequence; + vector rank_sequence; sendcounts.resize(num_ranks); sdispls.resize(num_ranks); @@ -53,18 +53,18 @@ namespace neuroh5 // Recommended all-to-all communication pattern: start at the current rank, then wrap around; // (as opposed to starting at rank 0) - for (int key_rank = start_rank; key_rank < end_rank; key_rank++) + for (rank_t key_rank = start_rank; key_rank < end_rank; key_rank++) { rank_sequence.push_back(key_rank); } - for (int key_rank = 0; key_rank < (int)start_rank; key_rank++) + for (rank_t key_rank = 0; key_rank < start_rank; key_rank++) { rank_sequence.push_back(key_rank); } size_t sendpos = 0; std::stringstream ss(ios::in | ios::out | ios::binary); - for (const int& key_rank : rank_sequence) + for (const rank_t& key_rank : rank_sequence) { sdispls[key_rank] = sendpos; @@ -125,22 +125,22 @@ namespace neuroh5 void deserialize_rank_edge_map (const size_t num_ranks, const vector &recvbuf, - const vector& recvcounts, - const vector& rdispls, + const vector& recvcounts, + const vector& rdispls, edge_map_t& prj_edge_map, size_t& num_unpacked_nodes, size_t& num_unpacked_edges ) { - const int recvbuf_size = recvbuf.size(); + const size_t recvbuf_size = recvbuf.size(); for (size_t ridx = 0; ridx < num_ranks; ridx++) { if (recvcounts[ridx] > 0) { - int recvsize = recvcounts[ridx]; - int recvpos = rdispls[ridx]; - int startpos = recvpos; + size_t recvsize = recvcounts[ridx]; + size_t recvpos = rdispls[ridx]; + size_t startpos = recvpos; edge_map_t edge_map; throw_assert(recvpos < recvbuf_size, diff --git a/src/data/serialize_tree.cc b/src/data/serialize_tree.cc index 91b2c7e..e730c3c 100644 --- a/src/data/serialize_tree.cc +++ b/src/data/serialize_tree.cc @@ -39,32 +39,32 @@ namespace neuroh5 void serialize_rank_tree_map (const size_t num_ranks, const size_t start_rank, const map >& rank_tree_map, - vector& sendcounts, + vector& sendcounts, vector &sendbuf, - vector &sdispls) + vector &sdispls) { - vector rank_sequence; + vector rank_sequence; sdispls.resize(num_ranks); sendcounts.resize(num_ranks); - int end_rank = num_ranks; + rank_t end_rank = num_ranks; throw_assert(start_rank < end_rank, "serialize_rank_tree_map: invalid start rank"); // Recommended all-to-all communication pattern: start at the current rank, then wrap around; // (as opposed to starting at rank 0) - for (int key_rank = start_rank; key_rank < end_rank; key_rank++) + for (rank_t key_rank = start_rank; key_rank < end_rank; key_rank++) { rank_sequence.push_back(key_rank); } - for (int key_rank = 0; key_rank < (int)start_rank; key_rank++) + for (rank_t key_rank = 0; key_rank < start_rank; key_rank++) { rank_sequence.push_back(key_rank); } size_t sendpos = 0; std::stringstream ss(ios::in | ios::out | ios::binary); - for (const int& key_rank : rank_sequence) + for (const rank_t& key_rank : rank_sequence) { sdispls[key_rank] = sendpos; @@ -97,20 +97,20 @@ namespace neuroh5 void deserialize_rank_tree_map (const size_t num_ranks, const vector &recvbuf, - const vector& recvcounts, - const vector& rdispls, + const vector& recvcounts, + const vector& rdispls, map &all_tree_map ) { - const int recvbuf_size = recvbuf.size(); + const size_t recvbuf_size = recvbuf.size(); for (size_t ridx = 0; ridx < num_ranks; ridx++) { if (recvcounts[ridx] > 0) { - int recvsize = recvcounts[ridx]; - int recvpos = rdispls[ridx]; - int startpos = recvpos; + size_t recvsize = recvcounts[ridx]; + size_t recvpos = rdispls[ridx]; + size_t startpos = recvpos; map tree_map; throw_assert(recvpos < recvbuf_size, @@ -133,20 +133,20 @@ namespace neuroh5 void deserialize_rank_tree_list (const size_t num_ranks, const vector &recvbuf, - const vector& recvcounts, - const vector& rdispls, + const vector& recvcounts, + const vector& rdispls, forward_list &all_tree_list ) { - const int recvbuf_size = recvbuf.size(); + const size_t recvbuf_size = recvbuf.size(); for (size_t ridx = 0; ridx < num_ranks; ridx++) { if (recvcounts[ridx] > 0) { - int recvsize = recvcounts[ridx]; - int recvpos = rdispls[ridx]; - int startpos = recvpos; + size_t recvsize = recvcounts[ridx]; + size_t recvpos = rdispls[ridx]; + size_t startpos = recvpos; map tree_map; throw_assert(recvpos < recvbuf_size, "deserialize_rank_tree_vector: invalid buffer displacement"); diff --git a/src/graph/append_graph.cc b/src/graph/append_graph.cc index 05df46f..25b0863 100644 --- a/src/graph/append_graph.cc +++ b/src/graph/append_graph.cc @@ -10,6 +10,7 @@ #include "neuroh5_types.hh" +#include "alltoallv_template.hh" #include "attr_map.hh" #include "cell_populations.hh" #include "append_graph.hh" @@ -262,8 +263,8 @@ namespace neuroh5 // send buffer and structures for MPI Alltoall operation - vector sendbuf; - vector sendcounts(size,0), sdispls(size,0), recvcounts(size,0), rdispls(size,0); + vector sendbuf, recvbuf; + vector sendcounts(size,0), sdispls(size,0), recvcounts(size,0), rdispls(size,0); // Create serialized object with the edges of vertices for the respective I/O rank size_t num_packed_edges = 0; @@ -272,40 +273,17 @@ namespace neuroh5 data::serialize_rank_edge_map (size, rank, rank_edge_map, num_packed_edges, sendcounts, sendbuf, sdispls); rank_edge_map.clear(); - - // 1. Each ALL_COMM rank sends an edge vector size to - // every other ALL_COMM rank (non IO_COMM ranks receive zero), - // and creates sendcounts and sdispls arrays - - throw_assert_nomsg(MPI_Alltoall(&sendcounts[0], 1, MPI_INT, &recvcounts[0], 1, MPI_INT, all_comm) == MPI_SUCCESS); - - throw_assert_nomsg(MPI_Barrier(all_comm) == MPI_SUCCESS); - - // 2. Each ALL_COMM rank accumulates the vector sizes and allocates - // a receive buffer, recvcounts, and rdispls - - size_t recvbuf_size = recvcounts[0]; - for (int p = 1; p < size; p++) - { - rdispls[p] = rdispls[p-1] + recvcounts[p-1]; - recvbuf_size += recvcounts[p]; - } - vector recvbuf; - recvbuf.resize(recvbuf_size > 0 ? recvbuf_size : 1, 0); - - // 3. Each ALL_COMM rank participates in the MPI_Alltoallv - throw_assert_nomsg(MPI_Alltoallv(&sendbuf[0], &sendcounts[0], &sdispls[0], MPI_CHAR, - &recvbuf[0], &recvcounts[0], &rdispls[0], MPI_CHAR, - all_comm) == MPI_SUCCESS); - throw_assert_nomsg(MPI_Barrier(all_comm) == MPI_SUCCESS); + throw_assert_nomsg(mpi::alltoallv_vector(all_comm, MPI_CHAR, sendcounts, sdispls, sendbuf, + recvcounts, rdispls, recvbuf) >= 0); sendbuf.clear(); + sendbuf.shrink_to_fit(); sendcounts.clear(); sdispls.clear(); size_t num_unpacked_edges = 0, num_unpacked_nodes = 0; edge_map_t prj_edge_map; - if (recvbuf_size > 0) + if (recvbuf.size() > 0) { data::deserialize_rank_edge_map (size, recvbuf, recvcounts, rdispls, prj_edge_map, num_unpacked_nodes, num_unpacked_edges); diff --git a/src/graph/node_attributes.cc b/src/graph/node_attributes.cc index 1afa88b..bf02934 100644 --- a/src/graph/node_attributes.cc +++ b/src/graph/node_attributes.cc @@ -4,7 +4,7 @@ /// /// Routines for manipulation of scalar and vector attributes associated with a graph node. /// -/// Copyright (C) 2016-2021 Project NeuroH5. +/// Copyright (C) 2016-2024 Project NeuroH5. //============================================================================== #include "neuroh5_types.hh" @@ -469,7 +469,7 @@ namespace neuroh5 throw_assert_nomsg(io_size > 0); vector sendbuf; - vector sendcounts, sdispls, recvcounts, rdispls; + vector sendcounts, sdispls, recvcounts, rdispls; sendcounts.resize(size,0); sdispls.resize(size,0); @@ -559,24 +559,10 @@ namespace neuroh5 attr_map.insert_name(attr_names[data::AttrMap::attr_index_int32][i]); } - // 6. Each ALL_COMM rank sends an attribute set size to - // every other ALL_COMM rank (non IO_COMM ranks pass zero) - - throw_assert_nomsg(MPI_Alltoall(&sendcounts[0], 1, MPI_INT, - &recvcounts[0], 1, MPI_INT, all_comm) == MPI_SUCCESS); // 7. Each ALL_COMM rank accumulates the vector sizes and allocates // a receive buffer, recvcounts, and rdispls - size_t recvbuf_size; vector recvbuf; - - recvbuf_size = recvcounts[0]; - for (int p = 1; p < ssize; ++p) - { - rdispls[p] = rdispls[p-1] + recvcounts[p-1]; - recvbuf_size += recvcounts[p]; - } - if (recvbuf_size > 0) recvbuf.resize(recvbuf_size); // 8. Each ALL_COMM rank participates in the MPI_Alltoallv throw_assert_nomsg(mpi::alltoallv_vector(all_comm, MPI_CHAR, sendcounts, sdispls, sendbuf, diff --git a/src/graph/scatter_read_projection.cc b/src/graph/scatter_read_projection.cc index 1d6487d..a856406 100644 --- a/src/graph/scatter_read_projection.cc +++ b/src/graph/scatter_read_projection.cc @@ -5,7 +5,7 @@ /// Top-level functions for reading edges in DBS (Destination Block Sparse) /// format. /// -/// Copyright (C) 2016-2021 Project NeuroH5. +/// Copyright (C) 2016-2024 Project NeuroH5. //============================================================================== #include "debug.hh" @@ -100,14 +100,14 @@ namespace neuroh5 { vector recvbuf; - vector recvcounts, rdispls; + vector recvcounts, rdispls; { vector sendbuf; - vector sendcounts(size,0), sdispls(size,0); + vector sendcounts(size,0), sdispls(size,0); mpi::MPI_DEBUG(all_comm, "scatter_read_projection: ", src_pop_name, " -> ", dst_pop_name, "\n"); - + if (is_io_rank) { int io_rank; @@ -158,10 +158,13 @@ namespace neuroh5 edge_map_type) >= 0); mpi::MPI_DEBUG(io_comm, "scatter_read_projection: read ", num_edges, - " edges from projection ", src_pop_name, " -> ", dst_pop_name); + " edges from projection ", src_pop_name, " -> ", dst_pop_name); // ensure that all edges in the projection have been read and appended to edge_list - throw_assert_nomsg(num_edges == src_idx.size()); + throw_assert(num_edges == src_idx.size(), + "edge count mismatch: num_edges = " << num_edges << + " src_idx.size = " << src_idx.size()); + size_t num_packed_edges = 0; @@ -182,12 +185,16 @@ namespace neuroh5 recvcounts, rdispls, recvbuf) >= 0); } + mpi::MPI_DEBUG(all_comm, "scatter_read_projection: recvbuf size is ", recvbuf.size()); + if (recvbuf.size() > 0) { data::deserialize_rank_edge_map (size, recvbuf, recvcounts, rdispls, prj_edge_map, local_num_nodes, local_num_edges); } + mpi::MPI_DEBUG(all_comm, "scatter_read_projection: prj_edge_map size is ", prj_edge_map.size()); + if (!attr_namespaces.empty()) { vector sendbuf; uint32_t sendbuf_size=0; diff --git a/src/graph/scatter_read_projection_selection.cc b/src/graph/scatter_read_projection_selection.cc index bb2c677..a8e6bc5 100644 --- a/src/graph/scatter_read_projection_selection.cc +++ b/src/graph/scatter_read_projection_selection.cc @@ -5,7 +5,7 @@ /// Functions for reading edge information in DBS (Destination Block Sparse) /// format. /// -/// Copyright (C) 2016-2021 Project NeuroH5. +/// Copyright (C) 2016-2024 Project NeuroH5. //============================================================================== @@ -93,11 +93,11 @@ namespace neuroh5 { vector recvbuf; - vector recvcounts, rdispls; + vector recvcounts, rdispls; { vector sendbuf; - vector sendcounts(size,0), sdispls(size,0); + vector sendcounts(size,0), sdispls(size,0); if (is_io_rank) { diff --git a/src/graph/write_graph.cc b/src/graph/write_graph.cc index 36d3d46..b0d1569 100644 --- a/src/graph/write_graph.cc +++ b/src/graph/write_graph.cc @@ -11,6 +11,7 @@ #include "neuroh5_types.hh" +#include "alltoallv_template.hh" #include "attr_map.hh" #include "cell_populations.hh" #include "write_graph.hh" @@ -267,9 +268,9 @@ namespace neuroh5 } - // send buffer and structures for MPI Alltoall operation - vector sendbuf; - vector sendcounts(size,0), sdispls(size,0), recvcounts(size,0), rdispls(size,0); + // send buffer and structures for MPI Alltoallv operation + vector sendbuf, recvbuf; + vector sendcounts(size,0), sdispls(size,0), recvcounts(size,0), rdispls(size,0); // Create serialized object with the edges of vertices for the respective I/O rank size_t num_packed_edges = 0; @@ -277,44 +278,27 @@ namespace neuroh5 data::serialize_rank_edge_map (size, rank, rank_edge_map, num_packed_edges, sendcounts, sendbuf, sdispls); rank_edge_map.clear(); - - // 1. Each ALL_COMM rank sends an edge vector size to - // every other ALL_COMM rank (non IO_COMM ranks receive zero), - // and creates sendcounts and sdispls arrays - - throw_assert_nomsg(MPI_Alltoall(&sendcounts[0], 1, MPI_INT, &recvcounts[0], 1, MPI_INT, all_comm) == MPI_SUCCESS); - - // 2. Each ALL_COMM rank accumulates the vector sizes and allocates - // a receive buffer, recvcounts, and rdispls - - size_t recvbuf_size = recvcounts[0]; - for (int p = 1; p < size; p++) - { - rdispls[p] = rdispls[p-1] + recvcounts[p-1]; - recvbuf_size += recvcounts[p]; - } - vector recvbuf; - recvbuf.resize(recvbuf_size > 0 ? recvbuf_size : 1, 0); - - // 3. Each ALL_COMM rank participates in the MPI_Alltoallv - throw_assert_nomsg(MPI_Alltoallv(&sendbuf[0], &sendcounts[0], &sdispls[0], MPI_CHAR, - &recvbuf[0], &recvcounts[0], &rdispls[0], MPI_CHAR, - all_comm) == MPI_SUCCESS); - throw_assert_nomsg(MPI_Barrier(all_comm) == MPI_SUCCESS); + throw_assert_nomsg(mpi::alltoallv_vector(all_comm, MPI_CHAR, sendcounts, sdispls, sendbuf, + recvcounts, rdispls, recvbuf) >= 0); sendbuf.clear(); + sendbuf.shrink_to_fit(); sendcounts.clear(); sdispls.clear(); size_t num_unpacked_edges = 0, num_unpacked_nodes = 0; edge_map_t prj_edge_map; - if (recvbuf_size > 0) + if (recvbuf.size() > 0) { data::deserialize_rank_edge_map (size, recvbuf, recvcounts, rdispls, prj_edge_map, num_unpacked_nodes, num_unpacked_edges); } + recvbuf.clear(); + recvcounts.clear(); + rdispls.clear(); + if ((rank_t)rank < io_size) { hid_t fapl = H5Pcreate(H5P_FILE_ACCESS); diff --git a/src/hdf5/read_projection_datasets.cc b/src/hdf5/read_projection_datasets.cc index 396f823..c7cef2a 100644 --- a/src/hdf5/read_projection_datasets.cc +++ b/src/hdf5/read_projection_datasets.cc @@ -5,7 +5,7 @@ /// Functions for reading edge information in DBS (Destination Block Sparse) /// format. /// -/// Copyright (C) 2016-2020 Project NeuroH5. +/// Copyright (C) 2016-2024 Project NeuroH5. //============================================================================== #include @@ -73,8 +73,8 @@ namespace neuroh5 // determine number of blocks in projection hsize_t num_blocks = hdf5::dataset_num_elements (file, hdf5::edge_attribute_path(src_pop_name, dst_pop_name, hdf5::EDGES, hdf5::DST_BLK_PTR)); - if (num_blocks > 0) - num_blocks--; + //if (num_blocks > 0) + // num_blocks--; // determine number of edges in projection total_num_edges = hdf5::dataset_num_elements @@ -123,12 +123,12 @@ namespace neuroh5 hsize_t block; if (stop > start) - block = stop - start + 1; + block = stop - start; else block = 0; if (block > 0) { - local_read_blocks = block-1; + local_read_blocks = block; } else { @@ -161,6 +161,14 @@ namespace neuroh5 // rebase the block_ptr array to local offsets // REBASE is going to be the start offset for the hyperslab + mpi::MPI_DEBUG(comm, "read_projection_dataset: ", + src_pop_name, " -> ", dst_pop_name, ": ", + "start = ", start, " ", + "block = ", block, " ", + "dst_blk_ptr.size = ", dst_blk_ptr.size(), " ", + "dst_blk_ptr.front = ", dst_blk_ptr.front(), " ", + "dst_blk_ptr.back = ", dst_blk_ptr.back(), " ", + "\n"); if (block > 0) { @@ -207,18 +215,73 @@ namespace neuroh5 { dst_ptr_start = (hsize_t)block_rebase; dst_ptr_block = (hsize_t)(dst_blk_ptr.back() - dst_blk_ptr.front()); - if (stop < num_blocks) - { - dst_ptr_block ++; - } + //if (stop < num_blocks) + // { + // dst_ptr_block ++; + // } } - + + // Determine upper bound for dst ptr + { + int status; + MPI_Request request; + vector dst_ptr_ranges(size, 0); + + status = MPI_Iallgather(&block_rebase, 1, MPI_ATTR_PTR_T, + &dst_ptr_ranges[0], 1, MPI_ATTR_PTR_T, comm, + &request); + throw_assert(status == MPI_SUCCESS, + "read_projection_datasets: error in MPI_Iallgather: status: " << status); + status = MPI_Wait(&request, MPI_STATUS_IGNORE); + throw_assert(status == MPI_SUCCESS, + "read_projection_datasets: error in MPI_Wait: status: " << status); + if ((rank < (size-1)) && ((dst_ptr_start + dst_ptr_block) < (dst_ptr_ranges[rank+1]))) + { + dst_ptr_block = dst_ptr_ranges[rank+1] - dst_ptr_start; + } + + if (rank < (size-1)) + { + mpi::MPI_DEBUG(comm, "read_projection_dataset: ", + src_pop_name, " -> ", dst_pop_name, ": ", + "dst_ptr_start = ", dst_ptr_start, " ", + "dst_ptr_block = ", dst_ptr_block, " ", + "dst_ptr_ranges[", rank+1, "] = ", dst_ptr_ranges[rank+1], " ", + "start = ", start, " ", + "block = ", block, " ", + "dst_blk_ptr.size = ", dst_blk_ptr.size(), " ", + "dst_blk_ptr.front = ", dst_blk_ptr.front(), " ", + "dst_blk_ptr.back = ", dst_blk_ptr.back(), " ", + "\n"); + } + else + { + mpi::MPI_DEBUG(comm, "read_projection_dataset: ", + src_pop_name, " -> ", dst_pop_name, ": ", + "dst_ptr_start = ", dst_ptr_start, " ", + "dst_ptr_block = ", dst_ptr_block, " ", + "start = ", start, " ", + "block = ", block, " ", + "dst_blk_ptr.size = ", dst_blk_ptr.size(), " ", + "dst_blk_ptr.front = ", dst_blk_ptr.front(), " ", + "dst_blk_ptr.back = ", dst_blk_ptr.back(), + "\n"); + } + } + if (dst_ptr_block > 0) { dst_ptr.resize(dst_ptr_block, 0); } + mpi::MPI_DEBUG(comm, "read_projection_dataset: ", + src_pop_name, " -> ", dst_pop_name, ": ", + "dst_ptr_start = ", dst_ptr_start, " ", + "dst_ptr_block = ", dst_ptr_block, " ", + "dst_ptr.size = ", dst_ptr.size(), " ", + "\n"); + ierr = hdf5::read ( file, @@ -230,10 +293,17 @@ namespace neuroh5 rapl ); throw_assert_nomsg(ierr >= 0); - + + mpi::MPI_DEBUG(comm, "read_projection_dataset: ", + src_pop_name, " -> ", dst_pop_name, ": ", + "dst_ptr_start = ", dst_ptr_start, " ", + "dst_ptr_block = ", dst_ptr_block, " ", + "dst_ptr.size = ", dst_ptr.size(), " ", + "dst_ptr.front = ", (dst_ptr.size() > 0) ? dst_ptr.front() : 0, " ", + "dst_ptr.back = ", (dst_ptr.size() > 0) ? dst_ptr.back() : 0, " ", + "\n"); + DST_PTR_T dst_rebase = 0; - - hsize_t src_idx_block=0, src_idx_start=0; if (dst_ptr_block > 0) { dst_rebase = dst_ptr[0]; @@ -242,18 +312,55 @@ namespace neuroh5 { dst_ptr[i] -= dst_rebase; } - - // read source indices + } + + hsize_t src_idx_block=0, src_idx_start=0; + + if (dst_ptr.size() > 0) + { + // determine source indices src_idx_start = dst_rebase; src_idx_block = (hsize_t)(dst_ptr.back() - dst_ptr.front()); + } - // allocate buffer and memory dataspace - if (src_idx_block > 0) - { - src_idx.resize(src_idx_block, 0); - } + // Determine upper bound for src idx + { + int status; + MPI_Request request; + vector src_idx_ranges(size, 0); + + status = MPI_Iallgather(&dst_rebase, 1, MPI_ATTR_PTR_T, + &src_idx_ranges[0], 1, MPI_ATTR_PTR_T, comm, + &request); + throw_assert(status == MPI_SUCCESS, + "read_projection_datasets: error in MPI_Iallgather: status: " << status); + status = MPI_Wait(&request, MPI_STATUS_IGNORE); + throw_assert(status == MPI_SUCCESS, + "read_projection_datasets: error in MPI_Wait: status: " << status); + if ((rank < (size-1)) && ((src_idx_start + src_idx_block) < (src_idx_ranges[rank+1]))) + { + src_idx_block = src_idx_ranges[rank+1] - src_idx_start; + } + + + } + + // allocate buffer and memory dataspace + if (src_idx_block > 0) + { + src_idx.resize(src_idx_block, 0); } + mpi::MPI_DEBUG(comm, "read_projection_dataset: ", + src_pop_name, " -> ", dst_pop_name, ": ", + "dst_blk_ptr.size = ", dst_blk_ptr.size(), " ", + "dst_blk_ptr.front = ", dst_blk_ptr.front(), " ", + "dst_blk_ptr.back = ", dst_blk_ptr.back(), " ", + "src_idx_start = ", src_idx_start, " ", + "src_idx_block = ", src_idx_block, " ", + "src_idx.size = ", src_idx.size(), + "\n"); + ierr = hdf5::read ( file, @@ -265,7 +372,6 @@ namespace neuroh5 rapl ); throw_assert_nomsg(ierr >= 0); - throw_assert_nomsg(H5Pclose(rapl) >= 0); } throw_assert_nomsg(H5Fclose(file) >= 0); From 716681b963bb38ad0d614f3083385357789a50d1 Mon Sep 17 00:00:00 2001 From: Ivan Raikov Date: Wed, 8 Jan 2025 13:46:59 -0800 Subject: [PATCH 2/4] added chunked write for append_edge_attributes --- include/hdf5/hdf5_edge_attributes.hh | 47 ++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 9 deletions(-) diff --git a/include/hdf5/hdf5_edge_attributes.hh b/include/hdf5/hdf5_edge_attributes.hh index 1665140..d52e167 100644 --- a/include/hdf5/hdf5_edge_attributes.hh +++ b/include/hdf5/hdf5_edge_attributes.hh @@ -22,6 +22,8 @@ namespace neuroh5 namespace hdf5 { + // Constants for chunking + constexpr size_t EDGE_ATTR_CHUNK_SIZE = 1ULL << 30; // 1GB chunks for safety margin void size_edge_attributes ( @@ -92,13 +94,15 @@ namespace neuroh5 attr_name, current_value_size); + MPI_Request request; size_t my_count = value.size(); std::vector all_counts(size, 0); - throw_assert(MPI_Allgather(&my_count, 1, MPI_SIZE_T, &all_counts[0], 1, - MPI_SIZE_T, comm) == MPI_SUCCESS, - "append_edge_attribute: error in MPI_Allgather"); - throw_assert(MPI_Barrier(comm) == MPI_SUCCESS, - "append_edge_attribute: error in MPI_Barrier"); + throw_assert(MPI_Iallgather(&my_count, 1, MPI_SIZE_T, &all_counts[0], 1, + MPI_SIZE_T, comm, &request) == MPI_SUCCESS, + "append_edge_attribute: error in MPI_Iallgather"); + + throw_assert(MPI_Wait(&request, MPI_STATUS_IGNORE) == MPI_SUCCESS, + "append_edge_attribute: error in MPI_Wait"); // calculate the total dataset size and the offset of my piece hsize_t local_value_start = current_value_size, @@ -126,10 +130,35 @@ namespace neuroh5 string path = edge_attribute_path(src_pop_name, dst_pop_name, attr_namespace, attr_name); - status = write (file, path, - global_value_size, local_value_start, local_value_size, - mtype, value, wapl); - + while (global_value_size - current_value_size > 0) + { + hsize_t local_write_size = std::min((hsize_t)EDGE_ATTR_CHUNK_SIZE, local_value_size); + std::vector all_write_counts(size, 0); + + throw_assert(MPI_Iallgather(&local_write_size, 1, MPI_SIZE_T, &all_write_counts[0], 1, + MPI_SIZE_T, comm, &request) == MPI_SUCCESS, + "append_edge_attribute: error in MPI_Iallgather"); + + throw_assert(MPI_Wait(&request, MPI_STATUS_IGNORE) == MPI_SUCCESS, + "append_edge_attribute: error in MPI_Wait"); + + status = write (file, path, + global_value_size, local_value_start, local_write_size, + mtype, value, wapl); + + if (local_value_size > 0) + { + local_value_size -= local_write_size; + local_value_start += local_write_size; + } + + + for (size_t p = 0; p < size; ++p) + { + current_value_size += (hsize_t) all_write_counts[p]; + } + + } throw_assert(H5Pclose(wapl) >= 0, "error in H5Pclose"); } From 098723227bd978c09746ce0413276baa9ed92c07 Mon Sep 17 00:00:00 2001 From: Ivan Raikov Date: Wed, 8 Jan 2025 14:07:02 -0800 Subject: [PATCH 3/4] using more non-blocking operations in append_graph/append_projection --- src/graph/append_graph.cc | 26 ++++++++++++------- src/graph/append_projection.cc | 46 ++++++++++++++++++++++------------ 2 files changed, 47 insertions(+), 25 deletions(-) diff --git a/src/graph/append_graph.cc b/src/graph/append_graph.cc index 25b0863..f68b9a0 100644 --- a/src/graph/append_graph.cc +++ b/src/graph/append_graph.cc @@ -111,15 +111,20 @@ namespace neuroh5 { // Determine the destination node indices present in the input // edge map across all ranks + MPI_Request request; size_t num_nodes = input_edge_map.size(); vector sendbuf_num_nodes(size, num_nodes); vector recvbuf_num_nodes(size); vector recvcounts(size, 0); vector displs(size+1, 0); - throw_assert_nomsg(MPI_Allgather(&sendbuf_num_nodes[0], 1, MPI_SIZE_T, - &recvbuf_num_nodes[0], 1, MPI_SIZE_T, all_comm) - == MPI_SUCCESS); - throw_assert_nomsg(MPI_Barrier(all_comm) == MPI_SUCCESS); + throw_assert(MPI_Iallgather(&sendbuf_num_nodes[0], 1, MPI_SIZE_T, + &recvbuf_num_nodes[0], 1, MPI_SIZE_T, + all_comm, + &request) == MPI_SUCCESS, + "append_graph: error in MPI_Iallgather"); + throw_assert(MPI_Wait(&request, MPI_STATUS_IGNORE) == MPI_SUCCESS, + "append_graph: error in MPI_Wait"); + for (size_t p=0; p p = sort_permutation(node_index, compare_nodes); apply_permutation_in_place(node_index, p); } diff --git a/src/graph/append_projection.cc b/src/graph/append_projection.cc index 255fbbc..6aae55f 100644 --- a/src/graph/append_projection.cc +++ b/src/graph/append_projection.cc @@ -506,6 +506,8 @@ namespace neuroh5 const bool collective ) { + MPI_Request request; + // do a sanity check on the input throw_assert_nomsg(src_start < src_end); throw_assert_nomsg(dst_start < dst_end); @@ -558,11 +560,13 @@ namespace neuroh5 throw_assert_nomsg(num_edges == src_idx.size()); size_t sum_num_edges = 0; - throw_assert_nomsg(MPI_Allreduce(&num_edges, &sum_num_edges, 1, - MPI_SIZE_T, MPI_SUM, comm) == MPI_SUCCESS); - - throw_assert_nomsg(MPI_Barrier(comm) == MPI_SUCCESS); - + throw_assert(MPI_Iallreduce(&num_edges, &sum_num_edges, 1, + MPI_SIZE_T, MPI_SUM, + comm, &request) == MPI_SUCCESS, + "append_projection: error in MPI_Iallreduce"); + throw_assert(MPI_Wait(&request, MPI_STATUS_IGNORE) == MPI_SUCCESS, + "append_projection: error in MPI_Wait"); + if (sum_num_edges == 0) { return; @@ -604,23 +608,33 @@ namespace neuroh5 vector sendbuf_num_blocks(size, num_blocks); vector recvbuf_num_blocks(size, 0); - throw_assert_nomsg(MPI_Allgather(&sendbuf_num_blocks[0], 1, MPI_SIZE_T, - &recvbuf_num_blocks[0], 1, MPI_SIZE_T, comm) - == MPI_SUCCESS); + throw_assert(MPI_Iallgather(&sendbuf_num_blocks[0], 1, MPI_SIZE_T, + &recvbuf_num_blocks[0], 1, MPI_SIZE_T, + comm, &request) + == MPI_SUCCESS, + "append_projection: error in MPI_Iallgather"); + throw_assert(MPI_Wait(&request, MPI_STATUS_IGNORE) == MPI_SUCCESS, + "append_projection: error in MPI_Wait"); vector sendbuf_num_dest(size, num_dest); vector recvbuf_num_dest(size, 0); - throw_assert_nomsg(MPI_Allgather(&sendbuf_num_dest[0], 1, MPI_SIZE_T, - &recvbuf_num_dest[0], 1, MPI_SIZE_T, comm) - == MPI_SUCCESS); + throw_assert(MPI_Iallgather(&sendbuf_num_dest[0], 1, MPI_SIZE_T, + &recvbuf_num_dest[0], 1, MPI_SIZE_T, + comm, &request) + == MPI_SUCCESS, + "append_projection: error in MPI_Iallgather"); + throw_assert(MPI_Wait(&request, MPI_STATUS_IGNORE) == MPI_SUCCESS, + "append_projection: error in MPI_Wait"); vector sendbuf_num_edge(size, num_edges); vector recvbuf_num_edge(size, 0); - throw_assert_nomsg(MPI_Allgather(&sendbuf_num_edge[0], 1, MPI_SIZE_T, - &recvbuf_num_edge[0], 1, MPI_SIZE_T, comm) - == MPI_SUCCESS); - - throw_assert_nomsg(MPI_Barrier(comm) == MPI_SUCCESS); + throw_assert(MPI_Iallgather(&sendbuf_num_edge[0], 1, MPI_SIZE_T, + &recvbuf_num_edge[0], 1, MPI_SIZE_T, + comm, &request) + == MPI_SUCCESS, + "append_projection: error in MPI_Iallgather"); + throw_assert(MPI_Wait(&request, MPI_STATUS_IGNORE) == MPI_SUCCESS, + "append_projection: error in MPI_Wait"); // determine last rank that has data size_t last_rank = size-1; From 5a527b831a3c438f2915085932107ba6af5fd2ce Mon Sep 17 00:00:00 2001 From: Ivan Raikov Date: Wed, 8 Jan 2025 14:41:27 -0800 Subject: [PATCH 4/4] version set to 0.1.14 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 090b162..7fa67b0 100644 --- a/setup.py +++ b/setup.py @@ -141,7 +141,7 @@ def build_extensions(self): name="NeuroH5", package_dir={"": "python"}, packages=["neuroh5"], - version="0.1.13", + version="0.1.14", maintainer="Ivan Raikov", maintainer_email="ivan.g.raikov@gmail.com", description="A parallel HDF5-based library for storage and processing of large-scale graphs and neural cell model attributes.",