diff --git a/include/hdf5/hdf5_edge_attributes.hh b/include/hdf5/hdf5_edge_attributes.hh index 1665140..d52e167 100644 --- a/include/hdf5/hdf5_edge_attributes.hh +++ b/include/hdf5/hdf5_edge_attributes.hh @@ -22,6 +22,8 @@ namespace neuroh5 namespace hdf5 { + // Constants for chunking + constexpr size_t EDGE_ATTR_CHUNK_SIZE = 1ULL << 30; // 1GB chunks for safety margin void size_edge_attributes ( @@ -92,13 +94,15 @@ namespace neuroh5 attr_name, current_value_size); + MPI_Request request; size_t my_count = value.size(); std::vector all_counts(size, 0); - throw_assert(MPI_Allgather(&my_count, 1, MPI_SIZE_T, &all_counts[0], 1, - MPI_SIZE_T, comm) == MPI_SUCCESS, - "append_edge_attribute: error in MPI_Allgather"); - throw_assert(MPI_Barrier(comm) == MPI_SUCCESS, - "append_edge_attribute: error in MPI_Barrier"); + throw_assert(MPI_Iallgather(&my_count, 1, MPI_SIZE_T, &all_counts[0], 1, + MPI_SIZE_T, comm, &request) == MPI_SUCCESS, + "append_edge_attribute: error in MPI_Iallgather"); + + throw_assert(MPI_Wait(&request, MPI_STATUS_IGNORE) == MPI_SUCCESS, + "append_edge_attribute: error in MPI_Wait"); // calculate the total dataset size and the offset of my piece hsize_t local_value_start = current_value_size, @@ -126,10 +130,35 @@ namespace neuroh5 string path = edge_attribute_path(src_pop_name, dst_pop_name, attr_namespace, attr_name); - status = write (file, path, - global_value_size, local_value_start, local_value_size, - mtype, value, wapl); - + while (global_value_size - current_value_size > 0) + { + hsize_t local_write_size = std::min((hsize_t)EDGE_ATTR_CHUNK_SIZE, local_value_size); + std::vector all_write_counts(size, 0); + + throw_assert(MPI_Iallgather(&local_write_size, 1, MPI_SIZE_T, &all_write_counts[0], 1, + MPI_SIZE_T, comm, &request) == MPI_SUCCESS, + "append_edge_attribute: error in MPI_Iallgather"); + + throw_assert(MPI_Wait(&request, MPI_STATUS_IGNORE) == MPI_SUCCESS, + "append_edge_attribute: error in MPI_Wait"); + + status = write (file, path, + global_value_size, local_value_start, local_write_size, + mtype, value, wapl); + + if (local_value_size > 0) + { + local_value_size -= local_write_size; + local_value_start += local_write_size; + } + + + for (size_t p = 0; p < size; ++p) + { + current_value_size += (hsize_t) all_write_counts[p]; + } + + } throw_assert(H5Pclose(wapl) >= 0, "error in H5Pclose"); }