Skip to content

Commit

Permalink
WIP: - introducing chunked all-to-all collective communication in ord…
Browse files Browse the repository at this point in the history
…er to work around 2GB MPI limit;

- some refactoring of read_projection_datasets in order to avoid haphazard increments of end ranges;
  • Loading branch information
iraikov committed Jan 7, 2025
1 parent 294f192 commit 5469888
Show file tree
Hide file tree
Showing 22 changed files with 548 additions and 266 deletions.
14 changes: 7 additions & 7 deletions include/cell/cell_attributes.hh
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ namespace neuroh5

vector<CELL_IDX_T> gid_recvbuf;
{
vector<int> idx_sendcounts(size, 0), idx_sdispls(size, 0), idx_recvcounts(size, 0), idx_rdispls(size, 0);
vector<size_t> idx_sendcounts(size, 0), idx_sdispls(size, 0), idx_recvcounts(size, 0), idx_rdispls(size, 0);
idx_sendcounts[io_dests[rank]] = local_index_vector.size();

throw_assert(mpi::alltoallv_vector<CELL_IDX_T>(comm, MPI_CELL_IDX_T,
Expand All @@ -478,7 +478,7 @@ namespace neuroh5
vector<ATTR_PTR_T> attr_ptr;
{
vector<ATTR_PTR_T> attr_size_recvbuf;
vector<int> attr_size_sendcounts(size, 0), attr_size_sdispls(size, 0), attr_size_recvcounts(size, 0), attr_size_rdispls(size, 0);
vector<size_t> attr_size_sendcounts(size, 0), attr_size_sdispls(size, 0), attr_size_recvcounts(size, 0), attr_size_rdispls(size, 0);
attr_size_sendcounts[io_dests[rank]] = local_attr_size_vector.size();

throw_assert(mpi::alltoallv_vector<ATTR_PTR_T>(comm, MPI_ATTR_PTR_T,
Expand Down Expand Up @@ -522,7 +522,7 @@ namespace neuroh5
local_value_vector.insert(local_value_vector.end(),v.begin(),v.end());
}

vector<int> value_sendcounts(size, 0), value_sdispls(size, 0), value_recvcounts(size, 0), value_rdispls(size, 0);
vector<size_t> value_sendcounts(size, 0), value_sdispls(size, 0), value_recvcounts(size, 0), value_rdispls(size, 0);
value_sendcounts[io_dests[rank]] = local_value_size;


Expand Down Expand Up @@ -902,7 +902,7 @@ namespace neuroh5

vector<CELL_IDX_T> gid_recvbuf;
{
vector<int> idx_sendcounts(size, 0), idx_sdispls(size, 0), idx_recvcounts(size, 0), idx_rdispls(size, 0);
vector<size_t> idx_sendcounts(size, 0), idx_sdispls(size, 0), idx_recvcounts(size, 0), idx_rdispls(size, 0);
idx_sendcounts[io_dests[rank]] = local_index_vector.size();

throw_assert(mpi::alltoallv_vector<CELL_IDX_T>(comm, MPI_CELL_IDX_T,
Expand All @@ -914,7 +914,7 @@ namespace neuroh5
vector<ATTR_PTR_T> attr_ptr;
{
vector<ATTR_PTR_T> attr_size_recvbuf;
vector<int> attr_size_sendcounts(size, 0), attr_size_sdispls(size, 0), attr_size_recvcounts(size, 0), attr_size_rdispls(size, 0);
vector<size_t> attr_size_sendcounts(size, 0), attr_size_sdispls(size, 0), attr_size_recvcounts(size, 0), attr_size_rdispls(size, 0);
attr_size_sendcounts[io_dests[rank]] = local_attr_size_vector.size();

throw_assert(mpi::alltoallv_vector<ATTR_PTR_T>(comm, MPI_ATTR_PTR_T,
Expand All @@ -925,7 +925,7 @@ namespace neuroh5
if ((is_io_rank) && (attr_size_recvbuf.size() > 0))
{
ATTR_PTR_T attr_ptr_offset = 0;
for (size_t s=0; s<ssize; s++)
for (size_t s=0; s<size; s++)
{
int count = attr_size_recvcounts[s];
for (size_t i=attr_size_rdispls[s]; i<attr_size_rdispls[s]+count; i++)
Expand Down Expand Up @@ -957,7 +957,7 @@ namespace neuroh5
local_value_vector.insert(local_value_vector.end(),v.begin(),v.end());
}

vector<int> value_sendcounts(size, 0), value_sdispls(size, 0), value_recvcounts(size, 0), value_rdispls(size, 0);
vector<size_t> value_sendcounts(size, 0), value_sdispls(size, 0), value_recvcounts(size, 0), value_rdispls(size, 0);
value_sendcounts[io_dests[rank]] = local_value_size;


Expand Down
69 changes: 69 additions & 0 deletions include/data/chunk_info.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#ifndef CHUNK_INFO_HH
#define CHUNK_INFO_HH

#include <set>
#include <algorithm>

using namespace std;

namespace neuroh5
{
namespace data
{

// Constants for chunking
constexpr size_t CHUNK_SIZE = 1ULL << 30; // 1GB chunks for safety margin

template<typename T>
struct ChunkInfo {
std::vector<int> sendcounts;
std::vector<int> sdispls;
std::vector<int> recvcounts;
std::vector<int> rdispls;
size_t total_send_size;
size_t total_recv_size;
};


template<typename T>
ChunkInfo<T> calculate_chunk_sizes(
const std::vector<size_t>& full_sendcounts,
const std::vector<size_t>& full_sdispls,
const std::vector<size_t>& full_recvcounts,
const std::vector<size_t>& full_rdispls,
size_t chunk_start,
size_t chunk_size)
{
const size_t size = full_sendcounts.size();
ChunkInfo<T> chunk;
chunk.sendcounts.resize(size);
chunk.sdispls.resize(size);
chunk.recvcounts.resize(size);
chunk.rdispls.resize(size);

chunk.total_send_size = 0;
chunk.total_recv_size = 0;

for (size_t i = 0; i < size; ++i) {
// Calculate how much data to send in this chunk
size_t send_remaining = (chunk_start < full_sendcounts[i]) ?
full_sendcounts[i] - chunk_start : 0;
chunk.sendcounts[i] = static_cast<int>(std::min(send_remaining, chunk_size));
chunk.sdispls[i] = static_cast<int>(full_sdispls[i] + chunk_start);
chunk.total_send_size += chunk.sendcounts[i];

// Calculate how much data to receive in this chunk
size_t recv_remaining = (chunk_start < full_recvcounts[i]) ?
full_recvcounts[i] - chunk_start : 0;
chunk.recvcounts[i] = static_cast<int>(std::min(recv_remaining, chunk_size));
chunk.rdispls[i] = static_cast<int>(full_rdispls[i] + chunk_start);
chunk.total_recv_size += chunk.recvcounts[i];
}

return chunk;
}


}
}
#endif
10 changes: 5 additions & 5 deletions include/data/serialize_cell_attributes.hh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
///
/// Functions for serializing cell attributes.
///
/// Copyright (C) 2017-2020 Project Neuroh5.
/// Copyright (C) 2017-2024 Project Neuroh5.
//==============================================================================

#ifndef SERIALIZE_CELL_ATTRIBUTES_HH
Expand All @@ -27,14 +27,14 @@ namespace neuroh5
void serialize_rank_attr_map (const size_t num_ranks,
const size_t start_rank,
const map <rank_t, AttrMap>& rank_attr_map,
std::vector<int>& sendcounts,
std::vector<size_t>& sendcounts,
std::vector<char> &sendbuf,
std::vector<int> &sdispls);
std::vector<size_t> &sdispls);

void deserialize_rank_attr_map (const size_t num_ranks,
const std::vector<char> &recvbuf,
const std::vector<int>& recvcounts,
const std::vector<int>& rdispls,
const std::vector<size_t>& recvcounts,
const std::vector<size_t>& rdispls,
AttrMap& all_attr_map);


Expand Down
10 changes: 5 additions & 5 deletions include/data/serialize_edge.hh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
///
/// Functions for serializing edge data.
///
/// Copyright (C) 2017 Project Neuroh5.
/// Copyright (C) 2017-2024 Project Neuroh5.
//==============================================================================

#ifndef SERIALIZE_EDGE_HH
Expand All @@ -31,14 +31,14 @@ namespace neuroh5
const size_t start_rank,
const rank_edge_map_t& prj_rank_edge_map,
size_t &num_packed_edges,
vector<int>& sendcounts,
vector<size_t>& sendcounts,
vector<char> &sendbuf,
vector<int> &sdispls);
vector<size_t> &sdispls);

void deserialize_rank_edge_map (const size_t num_ranks,
const vector<char> &recvbuf,
const vector<int>& recvcounts,
const vector<int>& rdispls,
const vector<size_t>& recvcounts,
const vector<size_t>& rdispls,
edge_map_t& prj_edge_map,
size_t& num_unpacked_nodes,
size_t& num_unpacked_edges
Expand Down
12 changes: 6 additions & 6 deletions include/data/serialize_tree.hh
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,21 @@ namespace neuroh5
void serialize_rank_tree_map (const size_t num_ranks,
const size_t start_rank,
const std::map <rank_t, std::map<CELL_IDX_T, neurotree_t> >& rank_tree_map,
std::vector<int>& sendcounts,
std::vector<size_t>& sendcounts,
std::vector<char> &sendbuf,
std::vector<int> &sdispls);
std::vector<size_t> &sdispls);

void deserialize_rank_tree_map (const size_t num_ranks,
const std::vector<char> &recvbuf,
const std::vector<int>& recvcounts,
const std::vector<int>& rdispls,
const std::vector<size_t>& recvcounts,
const std::vector<size_t>& rdispls,
std::map<CELL_IDX_T, neurotree_t> &all_tree_map
);

void deserialize_rank_tree_list (const size_t num_ranks,
const vector<char> &recvbuf,
const vector<int>& recvcounts,
const vector<int>& rdispls,
const vector<size_t>& recvcounts,
const vector<size_t>& rdispls,
forward_list<neurotree_t> &all_tree_list);
}
}
Expand Down
5 changes: 3 additions & 2 deletions include/graph/node_attributes.hh
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ namespace neuroh5
value_offset = value_offset + v.size();
}
//attr_ptr.push_back(value_offset);

T dummy;
MPI_Datatype mpi_type = infer_mpi_datatype(dummy);

vector<int> value_sendcounts(size, 0), value_sdispls(size, 0), value_recvcounts(size, 0), value_rdispls(size, 0);
value_sendcounts[io_dests[rank]] = local_value_size;
Expand Down Expand Up @@ -241,8 +244,6 @@ namespace neuroh5
//assert(recvbuf_size > 0);
vector<T> value_recvbuf(value_recvbuf_size);

T dummy;
MPI_Datatype mpi_type = infer_mpi_datatype(dummy);
throw_assert(MPI_Alltoallv(&value_vector[0], &value_sendcounts[0], &value_sdispls[0], mpi_type,
&value_recvbuf[0], &value_recvcounts[0], &value_rdispls[0], mpi_type,
comm) == MPI_SUCCESS, "error in MPI_Alltoallv");
Expand Down
148 changes: 148 additions & 0 deletions include/mpi/allgatherv_template.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// -*- mode: c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
//==============================================================================
/// @file allgatherv_template.hh
///
/// Function for sending data via MPI Allgatherv.
///
/// Copyright (C) 2017-2024 Project NeuroH5.
//==============================================================================

#ifndef ALLGATHERV_TEMPLATE_HH
#define ALLGATHERV_TEMPLATE_HH

#include <mpi.h>

#include <vector>
#include <map>

#include "mpi_debug.hh"
#include "throw_assert.hh"
#include "neuroh5_types.hh"
#include "attr_map.hh"
#include "chunk_info.hh"

using namespace std;


namespace neuroh5
{

namespace mpi
{

template<class T>
int allgatherv_vector (MPI_Comm comm,
const MPI_Datatype datatype,
size_t sendcount,
const vector<T>& sendbuf,
vector<size_t>& recvcounts,
vector<size_t>& rdispls,
vector<T>& recvbuf)
{
int ssize; size_t size;
throw_assert(MPI_Comm_size(comm, &ssize) == MPI_SUCCESS,
"allgatherv: unable to obtain size of MPI communicator");
throw_assert_nomsg(ssize > 0);
size = ssize;

// Exchange counts

/***************************************************************************
* Send MPI data with Allgatherv
**************************************************************************/
recvcounts.resize(size,0);
rdispls.resize(size,0);

// 1. Each ALL_COMM rank sends a data size to every other rank and
// creates sendcounts and sdispls arrays

{
int status;
MPI_Request request;

status = MPI_Iallgather(&sendcount, 1, MPI_SIZE_T,
&recvcounts[0], 1, MPI_SIZE_T, comm,
&request);
throw_assert(status == MPI_SUCCESS,
"allgatherv: error in MPI_Iallgather: status: " << status);
status = MPI_Wait(&request, MPI_STATUS_IGNORE);
throw_assert(status == MPI_SUCCESS,
"allgatherv: error in MPI_Wait: status: " << status);

}

// 2. Each rank accumulates the vector sizes and allocates
// a receive buffer, recvcounts, and rdispls
rdispls[0] = 0;
size_t recvbuf_size = recvcounts[0];
for (size_t p = 1; p < size; ++p)
{
rdispls[p] = rdispls[p-1] + recvcounts[p-1];
recvbuf_size += recvcounts[p];
}

//assert(recvbuf_size > 0);
recvbuf.resize(recvbuf_size, 0);

{
int status;
MPI_Request request;

// 3. Perform the actual data exchange in chunks
size_t chunk_start = 0;
while (true)
{
size_t global_remaining = 0;
size_t remaining = 0;
if (sendcount > chunk_start)
{
remaining = sendcount - chunk_start;
}

status = MPI_Iallreduce(&remaining,
&global_remaining,
1, MPI_SIZE_T, MPI_SUM,
comm, &request);

throw_assert (status == MPI_SUCCESS, "error in MPI_Iallreduce: status = " << status);
status = MPI_Wait(&request, MPI_STATUS_IGNORE);
throw_assert(status == MPI_SUCCESS,
"allgatherv: error in MPI_Wait: status: " << status);

if (global_remaining == 0)
break;

size_t current_chunk = (chunk_start < sendcount) ? std::min(data::CHUNK_SIZE, sendcount - chunk_start) : 0;

std::vector<int> chunk_recvcounts(size);
std::vector<int> chunk_displs(size);

for (rank_t i = 0; i < size; ++i) {
chunk_recvcounts[i] = static_cast<int>(
((chunk_start < recvcounts[i]) ? (recvcounts[i] - chunk_start) : 0));
chunk_displs[i] = static_cast<int>(rdispls[i] + chunk_start);
}


status = MPI_Iallgatherv(&sendbuf[chunk_start],
static_cast<int>(current_chunk),
datatype,
&recvbuf[0],
&chunk_recvcounts[0],
&chunk_displs[0],
datatype,
comm, &request);
throw_assert (status == MPI_SUCCESS, "error in MPI_Alltoallv: status = " << status);
status = MPI_Wait(&request, MPI_STATUS_IGNORE);
throw_assert(status == MPI_SUCCESS,
"allgatherv: error in MPI_Wait: status: " << status);
chunk_start = chunk_start + current_chunk;
}
}

return MPI_SUCCESS;
}
}
}

#endif
Loading

0 comments on commit 5469888

Please sign in to comment.