Skip to content

Commit

Permalink
added chunked write for append_edge_attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
iraikov committed Jan 8, 2025
1 parent 5469888 commit 716681b
Showing 1 changed file with 38 additions and 9 deletions.
47 changes: 38 additions & 9 deletions include/hdf5/hdf5_edge_attributes.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ namespace neuroh5
namespace hdf5
{

// Constants for chunking
constexpr size_t EDGE_ATTR_CHUNK_SIZE = 1ULL << 30; // 1GB chunks for safety margin

void size_edge_attributes
(
Expand Down Expand Up @@ -92,13 +94,15 @@ namespace neuroh5
attr_name,
current_value_size);

MPI_Request request;
size_t my_count = value.size();
std::vector<size_t> all_counts(size, 0);
throw_assert(MPI_Allgather(&my_count, 1, MPI_SIZE_T, &all_counts[0], 1,
MPI_SIZE_T, comm) == MPI_SUCCESS,
"append_edge_attribute: error in MPI_Allgather");
throw_assert(MPI_Barrier(comm) == MPI_SUCCESS,
"append_edge_attribute: error in MPI_Barrier");
throw_assert(MPI_Iallgather(&my_count, 1, MPI_SIZE_T, &all_counts[0], 1,
MPI_SIZE_T, comm, &request) == MPI_SUCCESS,
"append_edge_attribute: error in MPI_Iallgather");

throw_assert(MPI_Wait(&request, MPI_STATUS_IGNORE) == MPI_SUCCESS,
"append_edge_attribute: error in MPI_Wait");

// calculate the total dataset size and the offset of my piece
hsize_t local_value_start = current_value_size,
Expand Down Expand Up @@ -126,10 +130,35 @@ namespace neuroh5
string path = edge_attribute_path(src_pop_name, dst_pop_name,
attr_namespace, attr_name);

status = write<T> (file, path,
global_value_size, local_value_start, local_value_size,
mtype, value, wapl);

while (global_value_size - current_value_size > 0)
{
hsize_t local_write_size = std::min((hsize_t)EDGE_ATTR_CHUNK_SIZE, local_value_size);
std::vector<size_t> all_write_counts(size, 0);

throw_assert(MPI_Iallgather(&local_write_size, 1, MPI_SIZE_T, &all_write_counts[0], 1,
MPI_SIZE_T, comm, &request) == MPI_SUCCESS,
"append_edge_attribute: error in MPI_Iallgather");

throw_assert(MPI_Wait(&request, MPI_STATUS_IGNORE) == MPI_SUCCESS,
"append_edge_attribute: error in MPI_Wait");

status = write<T> (file, path,
global_value_size, local_value_start, local_write_size,
mtype, value, wapl);

if (local_value_size > 0)
{
local_value_size -= local_write_size;
local_value_start += local_write_size;
}


for (size_t p = 0; p < size; ++p)
{
current_value_size += (hsize_t) all_write_counts[p];
}

}
throw_assert(H5Pclose(wapl) >= 0, "error in H5Pclose");
}

Expand Down

0 comments on commit 716681b

Please sign in to comment.